# HG changeset patch # User jbe # Date 1433976594 -7200 # Node ID 573995950b0be7e23e7eb5bd849feeb39223d877 # Parent bd7225b303913b585aaf4e6e64e804b3c404b003 Restarted work on new HTTP module implementation diff -r bd7225b30391 -r 573995950b0b moonbridge_http.lua --- a/moonbridge_http.lua Fri Jun 05 19:53:41 2015 +0200 +++ b/moonbridge_http.lua Thu Jun 11 00:49:54 2015 +0200 @@ -108,105 +108,23 @@ return newtbl end -local headers_mt_self = setmetatable({}, {__mode="k"}) - -local headers_mts = { - headers_mt = { - __index = function(tbl, key) - local self = headers_mt_self[tbl] - local lowerkey = string.lower(key) - local result = self._headers[lowerkey] - if result == nil then - result = {} - end - tbl[lowerkey] = result - tbl[key] = result - return result - end - }, - -- table mapping header field names to value-lists - -- (for headers with comma separated values): - headers_csv_table = { - __index = function(tbl, key) - local self = headers_mt_self[tbl] - local result = {} - for i, line in ipairs(self.headers[key]) do - for entry in string.gmatch(line, "[^,]+") do - local value = string.match(entry, "^[ \t]*(..-)[ \t]*$") - if value then - result[#result+1] = value - end - end - end - tbl[key] = result - return result - end - }, - -- table mapping header field names to a comma separated string - -- (for headers with comma separated values): - headers_csv_string = { - __index = function(tbl, key) - local self = headers_mt_self[tbl] - local result = {} - for i, line in ipairs(self.headers[key]) do - result[#result+1] = line - end - result = string.concat(result, ", ") - tbl[key] = result - return result +function generate_handler(handler, options) + -- swap arguments if necessary (for convenience): + if type(handler) ~= "function" and type(options) == "function" then + handler, options = options, handler + end + -- helper function to process options: + local function default(name, default_value) + local value = options[name] + if value == nil then + return default_value + else + return value or nil end - }, - -- table mapping header field names to a single string value - -- (or false if header has been sent multiple times): - headers_value = { - __index = function(tbl, key) - local self = headers_mt_self[tbl] - if self._headers_value_nil[key] then - return nil - end - local result = nil - local values = self.headers_csv_table[key] - if #values == 0 then - self._headers_value_nil[key] = true - elseif #values == 1 then - result = values[1] - else - result = false - end - tbl[key] = result - return result - end - }, - -- table mapping header field names to a flag table, - -- indicating if the comma separated value contains certain entries: - headers_flags = { - __index = function(tbl, key) - local self = headers_mt_self[tbl] - local result = setmetatable({}, { - __index = function(tbl, key) - local lowerkey = string.lower(key) - local result = rawget(tbl, lowerkey) or false - tbl[lowerkey] = result - tbl[key] = result - return result - end - }) - for i, value in ipairs(self.headers_csv_table[key]) do - result[string.lower(value)] = true - end - tbl[key] = result - return result - end - } -} - -request_pt = {} -request_mt = { __index = request_pt } - -function request_pt:_init(handler, options) - self._application_handler = handler + end -- process options: options = options or {} + local preamble = "" -- preamble sent with every(!) HTTP response do -- named arg "static_headers" is used to create the preamble: local s = options.static_headers @@ -227,549 +145,53 @@ end end t[#t+1] = "" - self._preamble = table.concat(t, "\r\n") -- preamble sent with every(!) HTTP response - end - self._input_chunk_size = options.maximum_input_chunk_size or options.chunk_size or 16384 - self._output_chunk_size = options.minimum_output_chunk_size or options.chunk_size or 1024 - self._header_size_limit = options.header_size_limit or 1024*1024 - self._body_size_limit = options.body_size_limit or 64*1024*1024 - local function init_timeout(name, default) - local value = options[name] - if value == nil then - self["_"..name] = default - else - self["_"..name] = value - end - end - init_timeout("request_idle_timeout", 330) - init_timeout("request_header_timeout", 30) - init_timeout("request_body_timeout", 1800) - init_timeout("response_timeout", 1830) - self._poll = options.poll_function or moonbridge_io.poll - self:_create_closure("_write_yield") - self:_create_closure("_handler") - self:_create_header_metatables() -end - -function request_pt:_create_closure(name) - self[name.."_closure"] = function(...) - return self[name](self, ...) - end -end - -function request_pt:_handler(socket) - self._socket = socket - self._survive = true - self._socket_set = {[socket] = true} - self._faulty = false - self._state = "config" - self._connection_close_requested = false - self._connection_close_responded = false - for name, mt in pairs(headers_mts) do - local tbl = setmetatable({}, mt) - headers_mt_self[tbl] = self - self[name] = tbl - end - repeat - -- wait for input: - if not self._poll(self._socket_set, nil, self._request_idle_timeout) then - self:_error("408 Request Timeout", "Idle connection timed out") - return self._survive - end - -- read headers (with timeout): - do - local coro = coroutine.wrap(self._read_headers) - local timeout = self._request_header_timeout - local starttime = timeout and moonbridge_io.timeref() - while true do - local status = coro(self) - if status == nil then - local remaining - if timeout then - remaining = timeout - moonbridge_io.timeref(starttime) - end - if not self._poll(self._socket_set, nil, remaining) then - self:_error("408 Request Timeout", "Timeout while receiving headers") - return self._survive - end - elseif status == false then - return self._survive - elseif status == true then - break - else - error("Unexpected yield value") - end - end - end - -- prepare reading of body: - self._read_body_coro = coroutine.wrap(self._read_body) --TODO? - -- set timeout for application handler: - timeout(self._response_timeout or 0) - -- call application handler: - if self._application_handler(self) ~= true then - self._survive = false - end - -- enforce request:finish() - request:finish() - -- reset timeout of application handler - timeout(0) - until self._connection_close_responded - return self._survive -end - -function request_pt:_prepare_body() - self:_assert_not_faulty() - if self._state == "prepare" then - error("Unexpected state in HTTP module") - elseif self._state ~= "config" then - return + preamble = table.concat(t, "\r\n") end - self._state = "prepare" - local content_type = self.headers_value["Content-Type"] - if content_type then - if - content_type == "application/x-www-form-urlencoded" or - string.match(content_type, "^application/x%-www%-form%-urlencoded *;") - then - self._consume_all_input() - self.post_params_list = read_urlencoded_form(self.body) - else - local boundary = string.match( - content_type, - '^multipart/form%-data[ \t]*[;,][ \t]*boundary="([^"]+)"$' - ) or string.match( - content_type, - '^multipart/form%-data[ \t]*[;,][ \t]*boundary=([^"; \t]+)$' - ) - if boundary then - self.post_metadata_list = {} - boundary = "--" .. boundary - local headerdata = "" - local streamer - local field_name - local metadata = {} - local value_parts - local function default_streamer(chunk) - value_parts[#value_parts+1] = chunk - end - local function stream_part_finish() - if streamer == default_streamer then - local value = table.concat(value_parts) - value_parts = nil - if field_name then - local values = self.post_params_list[field_name] - values[#values+1] = value - local metadata_entries = post_metadata_list[field_name] - metadata_entries[#metadata_entries+1] = metadata - end - else - streamer() - end - headerdata = "" - streamer = nil - field_name = nil - metadata = {} - end - local function stream_part_chunk(chunk) - if streamer then - streamer(chunk) - else - headerdata = headerdata .. chunk - while true do - local line, remaining = string.match(headerdata, "^(.-)\r?\n(.*)$") - if not line then - break - end - if line == "" then - streamer = streamed_post_params[field_name] - if not streamer then - for i, rule in ipairs(streamed_post_param_patterns) do - if string.match(field_name, rule[1]) then - streamer = rule[2] - break - end - end - end - if not streamer then - value_parts = {} - streamer = default_streamer - end - streamer(remaining, field_name, metadata) - return - end - headerdata = remaining - local header_key, header_value = string.match(line, "^([^:]*):[ \t]*(.-)[ \t]*$") - if not header_key then - request_error(true, "400 Bad Request", "Invalid header in multipart/form-data part") - end - header_key = string.lower(header_key) - if header_key == "content-disposition" then - local escaped_header_value = string.gsub(header_value, '"[^"]*"', function(str) - return string.gsub(str, "=", "==") - end) - field_name = string.match(escaped_header_value, ';[ \t]*name="([^"]*)"') - if field_name then - field_name = string.gsub(field_name, "==", "=") - else - field_name = string.match(header_value, ';[ \t]*name=([^"; \t]+)') - end - metadata.file_name = string.match(escaped_header_value, ';[ \t]*filename="([^"]*)"') - if metadata.file_name then - metadata.file_name = string.gsub(metadata.file_name, "==", "=") - else - string.match(header_value, ';[ \t]*filename=([^"; \t]+)') - end - elseif header_key == "content-type" then - metadata.content_type = header_value - elseif header_key == "content-transfer-encoding" then - request_error(true, "400 Bad Request", "Content-transfer-encoding not supported by multipart/form-data parser") - end - end - end - end - local skippart = true -- ignore data until first boundary - local afterbound = false -- interpret 2 bytes after boundary ("\r\n" or "--") - local terminated = false -- final boundary read - local bigchunk = "" - request:stream_request_body(function(chunk) - if terminated then - return - end - bigchunk = bigchunk .. chunk - while true do - if afterbound then - if #bigchunk <= 2 then - return - end - local terminator = string.sub(bigchunk, 1, 2) - if terminator == "\r\n" then - afterbound = false - bigchunk = string.sub(bigchunk, 3) - elseif terminator == "--" then - terminated = true - bigchunk = nil - return - else - request_error(true, "400 Bad Request", "Error while parsing multipart body (expected CRLF or double minus)") - end - end - local pos1, pos2 = string.find(bigchunk, boundary, 1, true) - if not pos1 then - if not skippart then - local safe = #bigchunk-#boundary - if safe > 0 then - stream_part_chunk(string.sub(bigchunk, 1, safe)) - bigchunk = string.sub(bigchunk, safe+1) - end - end - return - end - if not skippart then - stream_part_chunk(string.sub(bigchunk, 1, pos1 - 1)) - stream_part_finish() - else - boundary = "\r\n" .. boundary - skippart = false - end - bigchunk = string.sub(bigchunk, pos2 + 1) - afterbound = true - end - end) - if not terminated then - request_error(true, "400 Bad Request", "Premature end of multipart/form-data request body") - end - request.post_metadata_list, request.post_metadata = post_metadata_list, post_metadata - else - request_error(true, "415 Unsupported Media Type", "Unknown Content-Type of request body") + local input_chunk_size = options.maximum_input_chunk_size or options.chunk_size or 16384 + local output_chunk_size = options.minimum_output_chunk_size or options.chunk_size or 1024 + local header_size_limit = options.header_size_limit or 1024*1024 + local body_size_limit = options.body_size_limit or 64*1024*1024 + local request_idle_timeout = default("request_idle_timeout", 330) + local request_header_timeout = default("request_header_timeout", 30) + local request_body_timeout = default("request_body_timeout", 60) + local request_response_timeout = default("request_response_timeout", 1800) + local poll = options.poll_function or moonbridge_io.poll + -- return socket handler: + return function(socket) + local socket_set = {[socket] = true} -- used for poll function + local survive = true -- set to false if process shall be terminated later + local consume -- function that reads some input if possible + -- function that drains some input if possible: + local function drain() + local bytes, status = assert(socket:drain_nb(input_chunk_size)) + if status == "eof" then + consume = nil end end - end - self.post_params = get_first_values(self.post_params_list) - self._state = "no_status_sent" -end - -function request_pt:_drain_input() - self._read_body_coro = "drain" -end - -function request_pt:_consume_some_input() - local coro = self._read_body_coro - if coro == "drain" then - local bytes, status = self._socket:drain_nb(self._input_chunk_size) - if status == "eof" then - coro = nil - end - elseif coro then - local retval = coro(self) - if retval ~= nil then - coro = nil -- can't consume more data - end - end -end - -function request_pt:_consume_all_input() - while self._read_body_coro do - self._poll(socket_set) - self:_consume_some_input() - end -end - -function request_pt:_error(status, explanation) -end - -function request_pt:_read(...) - local line, status = self._socket:read_yield(...) - if line == nil then - self._faulty = true - error(status) - else - return line, status - end -end - -function request_pt:_read_headers() - local remaining = self._header_size_limit - -- read and parse request line: - local target, proto - do - local line, status = self:_read(remaining-2, "\n") - if status == "maxlen" then - self:_error("414 Request-URI Too Long") - return false - elseif status == "eof" then - if line ~= "" then - self:_error("400 Bad Request", "Unexpected EOF in request-URI line") - end - return false - end - remaining = remaining - #line - self.method, target, proto = - line:match("^([^ \t\r]+)[ \t]+([^ \t\r]+)[ \t]*([^ \t\r]*)[ \t]*\r?\n$") - if not request.method then - self:_error("400 Bad Request", "Invalid request-URI line") - return false - elseif proto ~= "HTTP/1.1" then - self:_error("505 HTTP Version Not Supported") - return false - end - end - -- read and parse headers: - self._headers = {} - self._headers_value_nil = {} - while true do - local line, status = self:_read(remaining, "\n"); - if status == "maxlen" then - self:_error("431 Request Header Fields Too Large") - return false - elseif status == "eof" then - self:_error("400 Bad Request", "Unexpected EOF in request headers") - return false - end - remaining = remaining - #line - if line == "\r\n" or line == "\n" then - break - end - local key, value = string.match(line, "^([^ \t\r]+):[ \t]*(.-)[ \t]*\r?\n$") - if not key then - self:_error("400 Bad Request", "Invalid header line") - return false - end - local lowerkey = key:lower() - local values = self._headers[lowerkey] - if values then - values[#values+1] = value - else - self._headers[lowerkey] = {value} - end - end - -- process "Connection: close" header if existent: - self._connection_close_requested = self.headers_flags["Connection"]["close"] - -- process "Content-Length" header if existent: - do - local values = self.headers_csv_table["Content-Length"] - if #values > 0 then - self._request_body_content_length = tonumber(values[1]) - local proper_value = tostring(request_body_content_length) - for i, value in ipairs(values) do - value = string.match(value, "^0*(.*)") - if value ~= proper_value then - self:_error("400 Bad Request", "Content-Length header(s) invalid") - return false - end - end - if request_body_content_length > self._body_size_limit then - self:_error("413 Request Entity Too Large", "Announced request body size is too big") - return false + local function unblock() + if consume then + poll(socket_set, socket_set) + consume() + else + poll(nil, socket_set) end end - end - -- process "Transfer-Encoding" header if existent: - do - local flag = self.headers_flags["Transfer-Encoding"]["chunked"] - local list = self.headers_csv_table["Transfer-Encoding"] - if (flag and #list ~= 1) or (not flag and #list ~= 0) then - self:_error("400 Bad Request", "Unexpected Transfer-Encoding") - return false - end - end - -- process "Expect" header if existent: - for i, value in ipairs(self.headers_csv_table["Expect"]) do - if string.lower(value) ~= "100-continue" then - self:_error("417 Expectation Failed", "Unexpected Expect header") - return false - end - end - -- get mandatory Host header according to RFC 7230: - self.host = self.headers_value["Host"] - if not self.host then - self:_error("400 Bad Request", "No valid host header") - return false - end - -- parse request target: - self.path, self.query = string.match(target, "^/([^?]*)(.*)$") - if not self.path then - local host2 - host2, self.path, self.query = string.match(target, "^[Hh][Tt][Tt][Pp]://([^/?]+)/?([^?]*)(.*)$") - if host2 then - if self.host ~= host2 then - self:_error("400 Bad Request", "No valid host header") - return false - end - elseif not (target == "*" and self.method == "OPTIONS") then - self:_error("400 Bad Request", "Invalid request target") - return false - end - end - -- parse GET params: - if self.query then - self.get_params_list = read_urlencoded_form(request.query) - self.get_params = get_first_values(self.get_params_list) - end - -- parse cookies: - self.cookies = {} - for i, line in ipairs(self.headers["Cookie"]) do - for rawkey, rawvalue in - string.gmatch(line, "([^=; ]*)=([^=; ]*)") - do - self.cookies[decode_uri(rawkey)] = decode_uri(rawvalue) - end - end - -- indicate success: - return true -end - -function request_pt:_read_body() - local remaining = self._body_size_limit - if request.headers_flags["Transfer-Encoding"]["chunked"] then - while true do - local line, status = self:_read(32 + remaining, "\n") - if status == "maxlen" then - self:_error("400 Bad Request", "Request body size limit exceeded") - return false - elseif status == "eof" then - self:_error("400 Bad Request", "Encoding error or unexpected EOF while reading next chunk of request body") - return false - end - local zeros, lenstr = string.match(line, "^(0*)([1-9A-Fa-f]+[0-9A-Fa-f]*)\r?\n$") - local chunkext - if lenstr then - chunkext = "" - else - zeros, lenstr, chunkext = string.match(line, "^(0*)([1-9A-Fa-f]+[0-9A-Fa-f]*)([ \t;].-)\r?\n$") - end - if not lenstr or #lenstr > 13 then - self:_error("400 Bad Request", "Encoding error while reading chunk of request body") - return false + repeat + local request = { + socket = socket, + cookies = {} + } + local function send(...) + assert(socket:write_call(unblock, ...)) end - local len = tonumber("0x" .. lenstr) - remaining = remaining - (#zeros + #chunkext + len) - if remaining < 0 then - self:_error("400 Bad Request", "Request body size limit exceeded") - return false - end - if len == 0 then break end - if self:_read_body_bytes(len) == false then - return false - end - local term, status = self:_read(2, "\n") - if status == "eof" then - self:_error("400 Bad Request", "Unexpected EOF while reading next chunk of request body") - return false - end - if term ~= "\r\n" and term ~= "\n" then - self:_error("400 Bad Request", "Encoding error while reading chunk of request body") - return false + -- wait for input: + if not poll(socket_set, nil, request_idle_timeout) then + -- TODO: send error + return survive end - end - while true do - local line, status = self:_read(2 + remaining, "\n") - if status == "eof" then - self:_error("400 Bad Request", "Unexpected EOF while reading chunk of request body") - return false - end - if line == "\r\n" or line == "\n" then break end - remaining = remaining - #line - if remaining < 0 then - self:_error("413 Request Entity Too Large", "Request body size limit exceeded while reading trailer section of chunked request body") - return false - end - end - elseif request_body_content_length then - if self._read_body_bytes(request_body_content_length) == false then - return false - end + until connection_close_responded + return survive end - -- indicate success: - return true -end - -function request_pt:_read_body_bytes(remaining, callback) - while remaining > 0 do - local limit - if remaining > self._input_chunk_size then - limit = self._input_chunk_size - else - limit = remaining - end - local chunk, status = self:_read(limit) - if status == "eof" then - self:_error("400 Bad Request", "Unexpected EOF while reading chunk of request body") - return false - end - remaining = remaining - limit - if self._body_streamer then - self._body_streamer(chunk) - end - end - return true -end - -function request_pt:_assert_not_faulty() - assert(not self._faulty, "Tried to use faulty request handle") -end - -function request_pt:_write_yield() - self:_consume_some_input() - self._poll(self._socket_set, self._socket_set) -end - -function request_pt:_write(...) - assert(self._socket:write_call(self._write_yield_closure, ...)) -end - -function request_pt:_flush(...) - assert(self._socket:write_call(self._write_yield_closure, ...)) -end - --- function creating a HTTP handler: -function generate_handler(handler, options) - -- swap arguments if necessary (for convenience): - if type(handler) ~= "function" and type(options) == "function" then - handler, options = options, handler - end - local request = setmetatable({}, request_mt) - request:_init(handler, options) - return request._handler_closure end return _M