# HG changeset patch # User jbe # Date 1434483872 -7200 # Node ID fb54c76e1484e4fb4092eddeb87a152f87a56981 # Parent 32d7423a6753c74159a9d0c90b967d1e6c6bcb1d Completed new HTTP module implementation (untested yet) diff -r 32d7423a6753 -r fb54c76e1484 moonbridge_http.lua --- a/moonbridge_http.lua Tue Jun 16 02:49:34 2015 +0200 +++ b/moonbridge_http.lua Tue Jun 16 21:44:32 2015 +0200 @@ -240,7 +240,7 @@ local remaining_body_size_limit = body_size_limit -- table for caching nil values: local headers_value_nil = {} - -- create a new request object: + -- create a new request object with metatable: local request -- allow references to local variable request = { -- allow access to underlying socket: @@ -289,7 +289,7 @@ for i, line in ipairs(request.headers[key]) do result[#result+1] = line end - result = string.concat(result, ", ") + result = table.concat(result, ", ") self[key] = result return result end @@ -335,6 +335,11 @@ end }) } + -- create metatable for request object: + local request_mt = {} + setmetatable(request, request_mt) + -- callback for request body streaming: + local process_body_chunk -- local variables to track the state: local state = "init" -- one of: -- "init" (initial state) @@ -350,6 +355,8 @@ local content_length = nil -- value of Content-Length header sent local chunk_parts = {} -- list of chunks to send local chunk_bytes = 0 -- sum of lengths of chunks to send + local streamed_post_params = {} -- mapping from POST field name to stream function + local streamed_post_param_patterns = {} -- list of POST field pattern and stream function pairs -- functions to assert proper output/closing: local function assert_output(...) local retval, errmsg = ... @@ -389,6 +396,7 @@ assert_close(socket:close()) else send_flush() + process_body_chunk = nil consume_all() end end @@ -449,8 +457,6 @@ end return data end - -- callback for request body streaming: - local process_body_chunk -- reads a number of bytes from the socket, -- optionally feeding these bytes chunk-wise -- into a callback function: @@ -508,9 +514,177 @@ read_body_bytes(request_body_content_length) end end - -- function to prepare (or skip) body processing: + -- function to setup default request body handling: + local function default_request_body_handling() + local post_params_list, post_params = new_params_list() + local content_type = request.headers_value["Content-Type"] + if content_type then + if + content_type == "application/x-www-form-urlencoded" or + string.match(content_type, "^application/x%-www%-form%-urlencoded *;") + then + read_urlencoded_form(post_params_list, request.body) + request.post_params_list, request.post_params = post_params_list, post_params + else + local boundary = string.match( + content_type, + '^multipart/form%-data[ \t]*[;,][ \t]*boundary="([^"]+)"$' + ) or string.match( + content_type, + '^multipart/form%-data[ \t]*[;,][ \t]*boundary=([^"; \t]+)$' + ) + if boundary then + local post_metadata_list, post_metadata = new_params_list() + boundary = "--" .. boundary + local headerdata = "" + local streamer + local field_name + local metadata = {} + local value_parts + local function default_streamer(chunk) + value_parts[#value_parts+1] = chunk + end + local function stream_part_finish() + if streamer == default_streamer then + local value = table.concat(value_parts) + value_parts = nil + if field_name then + local values = post_params_list[field_name] + values[#values+1] = value + local metadata_entries = post_metadata_list[field_name] + metadata_entries[#metadata_entries+1] = metadata + end + else + streamer() + end + headerdata = "" + streamer = nil + field_name = nil + metadata = {} + end + local function stream_part_chunk(chunk) + if streamer then + streamer(chunk) + else + headerdata = headerdata .. chunk + while true do + local line, remaining = string.match(headerdata, "^(.-)\r?\n(.*)$") + if not line then + break + end + if line == "" then + streamer = streamed_post_params[field_name] + if not streamer then + for i, rule in ipairs(streamed_post_param_patterns) do + if string.match(field_name, rule[1]) then + streamer = rule[2] + break + end + end + end + if not streamer then + value_parts = {} + streamer = default_streamer + end + streamer(remaining, field_name, metadata) + return + end + headerdata = remaining + local header_key, header_value = string.match(line, "^([^:]*):[ \t]*(.-)[ \t]*$") + if not header_key then + request_error(true, "400 Bad Request", "Invalid header in multipart/form-data part") + end + header_key = string.lower(header_key) + if header_key == "content-disposition" then + local escaped_header_value = string.gsub(header_value, '"[^"]*"', function(str) + return string.gsub(str, "=", "==") + end) + field_name = string.match(escaped_header_value, ';[ \t]*name="([^"]*)"') + if field_name then + field_name = string.gsub(field_name, "==", "=") + else + field_name = string.match(header_value, ';[ \t]*name=([^"; \t]+)') + end + metadata.file_name = string.match(escaped_header_value, ';[ \t]*filename="([^"]*)"') + if metadata.file_name then + metadata.file_name = string.gsub(metadata.file_name, "==", "=") + else + string.match(header_value, ';[ \t]*filename=([^"; \t]+)') + end + elseif header_key == "content-type" then + metadata.content_type = header_value + elseif header_key == "content-transfer-encoding" then + request_error(true, "400 Bad Request", "Content-transfer-encoding not supported by multipart/form-data parser") + end + end + end + end + local skippart = true -- ignore data until first boundary + local afterbound = false -- interpret 2 bytes after boundary ("\r\n" or "--") + local terminated = false -- final boundary read + local bigchunk = "" + request:set_request_body_streamer(function(chunk) + if chunk == nil then + if not terminated then + request_error(true, "400 Bad Request", "Premature end of multipart/form-data request body") + end + request.post_metadata_list, request.post_metadata = post_metadata_list, post_metadata + end + if terminated then + return + end + bigchunk = bigchunk .. chunk + while true do + if afterbound then + if #bigchunk <= 2 then + return + end + local terminator = string.sub(bigchunk, 1, 2) + if terminator == "\r\n" then + afterbound = false + bigchunk = string.sub(bigchunk, 3) + elseif terminator == "--" then + terminated = true + bigchunk = nil + return + else + request_error(true, "400 Bad Request", "Error while parsing multipart body (expected CRLF or double minus)") + end + end + local pos1, pos2 = string.find(bigchunk, boundary, 1, true) + if not pos1 then + if not skippart then + local safe = #bigchunk-#boundary + if safe > 0 then + stream_part_chunk(string.sub(bigchunk, 1, safe)) + bigchunk = string.sub(bigchunk, safe+1) + end + end + return + end + if not skippart then + stream_part_chunk(string.sub(bigchunk, 1, pos1 - 1)) + stream_part_finish() + else + boundary = "\r\n" .. boundary + skippart = false + end + bigchunk = string.sub(bigchunk, pos2 + 1) + afterbound = true + end + end) + else + request_error(true, "415 Unsupported Media Type", "Unknown Content-Type of request body") + end + end + end + end + -- function to prepare body processing: local function prepare() assert_not_faulty() + if process_body_chunk == nil then + default_request_body_handling() + end if state ~= "init" then return end @@ -711,6 +885,74 @@ error("Unexpected internal status in HTTP engine") end end + -- method to register POST param stream handler for a single field name: + function request:stream_post_param(field_name, callback) + if state ~= "init" then + error("Cannot setup request body streamer at this stage") + end + streamed_post_params[field_name] = callback + end + -- method to register POST param stream handler for a field name pattern: + function request:stream_post_params(pattern, callback) + if state ~= "init" then + error("Cannot setup request body streamer at this stage") + end + streamed_post_param_patterns[#streamed_post_param_patterns+1] = {pattern, callback} + end + -- method to register request body stream handler + function request:set_request_body_streamer(callback) + if state ~= "init" then + error("Cannot setup request body streamer at this stage") + end + local inprogress = false + local buffer = {} + process_body_chunk = function(chunk) + if inprogress then + buffer[#buffer+1] = chunk + else + inprogress = true + callback(chunk) + while #buffer > 0 do + chunk = table.concat(buffer) + buffer = {} + callback(chunk) + end + inprogress = false + end + end + end + -- method to start reading request body + function request:consume_input() + prepare() + consume_all() + end + -- method to stream request body + function request:stream_request_body(callback) + request:set_request_body_streamer(function(chunk) + if chunk ~= nil then + callback(chunk) + end + end) + request:consume_input() + end + -- metamethod to read special attibutes of request object: + function request_mt:__index(key, value) + if key == "body" then + local chunks = {} + request:stream_request_body(function(chunk) + chunks[#chunks+1] = chunk + end) + self.body = table.concat(chunks) + return self.body + elseif + key == "post_params_list" or key == "post_params" or + key == "post_metadata_list" or key == "post_metadata" + then + prepare() + consume_all() + return self[key] + end + end -- wait for input: if not poll(socket_set, nil, request_idle_timeout) then return request_error(false, "408 Request Timeout", "Idle connection timed out")