jbe@58: jbe@58: /*** Version ***/ jbe@229: #define MOONBR_VERSION_STRING "1.0.1" jbe@58: jbe@0: jbe@0: /*** Compile-time configuration ***/ jbe@0: jbe@0: #define MOONBR_LUA_PANIC_BUG_WORKAROUND 1 jbe@0: jbe@0: jbe@0: /*** C preprocessor macros for portability support ***/ jbe@0: jbe@0: #ifndef __has_include jbe@0: #define __has_include(x) 0 jbe@0: #endif jbe@0: jbe@0: jbe@0: /*** Include directives for used system libraries ***/ jbe@0: jbe@0: #include jbe@0: #include jbe@135: #include jbe@0: #include jbe@135: #include jbe@135: #include jbe@135: #include jbe@135: #include jbe@135: #include jbe@136: #include jbe@0: #include jbe@0: #include jbe@0: #include jbe@0: #include jbe@125: #include jbe@125: #include jbe@135: #include jbe@0: #include jbe@135: #include jbe@0: #if defined(__FreeBSD__) || __has_include() jbe@0: #include jbe@0: #endif jbe@0: #if defined(__linux__) || __has_include() jbe@0: #include jbe@0: #endif jbe@0: #if defined(__linux__) || __has_include() jbe@0: #include jbe@0: #endif jbe@0: jbe@0: jbe@0: /*** Fallback definitions for missing constants on some platforms ***/ jbe@0: jbe@0: /* INFTIM is used as timeout parameter for poll() */ jbe@0: #ifndef INFTIM jbe@0: #define INFTIM -1 jbe@0: #endif jbe@0: jbe@0: jbe@0: /*** Include directives for Lua ***/ jbe@0: jbe@0: #include jbe@0: #include jbe@0: #include jbe@0: jbe@0: jbe@79: /*** Include directive for moonbridge_io library ***/ jbe@79: jbe@79: #include "moonbridge_io.h" jbe@79: jbe@79: jbe@0: /*** Constants ***/ jbe@0: jbe@0: /* Backlog option for listen() call */ jbe@0: #define MOONBR_LISTEN_BACKLOG 1024 jbe@0: jbe@0: /* Maximum length of a timestamp used for strftime() */ jbe@0: #define MOONBR_LOG_MAXTIMELEN 40 jbe@0: jbe@0: /* Maximum length of a log message */ jbe@0: #define MOONBR_LOG_MAXMSGLEN 4095 jbe@0: jbe@0: /* Exitcodes passed to exit() call */ jbe@0: #define MOONBR_EXITCODE_GRACEFUL 0 jbe@0: #define MOONBR_EXITCODE_CMDLINEERROR 1 jbe@0: #define MOONBR_EXITCODE_ALREADYRUNNING 2 jbe@0: #define MOONBR_EXITCODE_STARTUPERROR 3 jbe@0: #define MOONBR_EXITCODE_RUNTIMEERROR 4 jbe@0: jbe@0: /* Maximum length of a line sent to stderr by child processes */ jbe@0: #define MOONBR_MAXERRORLINELEN 1024 jbe@0: jbe@0: /* Maximum length of an error string returned by strerror() */ jbe@0: #define MOONBR_MAXSTRERRORLEN 80 jbe@0: jbe@213: /* Error message for noncompliant strerror_r() implementation on GNU systems */ jbe@213: #define MOONBR_STRERROR_R_MSG "Error detail unavailable due to noncompliant strerror_r() implementation" jbe@213: jbe@0: /* Status bytes exchanged between master and child processes */ jbe@124: #define MOONBR_STATUS_IDLE 'I' jbe@124: #define MOONBR_COMMAND_CONNECT 'C' jbe@124: #define MOONBR_COMMAND_TERMINATE 'T' jbe@124: #define MOONBR_STATUS_GOODBYE 'B' jbe@0: jbe@0: /* Constant file descriptors */ jbe@0: #define MOONBR_FD_STDERR 2 jbe@0: #define MOONBR_FD_CONTROL 3 jbe@0: #define MOONBR_FD_END 4 jbe@0: jbe@0: /* Return values of moonbr_try_destroy_worker() */ jbe@0: #define MOONBR_DESTROY_NONE 0 jbe@0: #define MOONBR_DESTROY_PREPARE 1 jbe@0: #define MOONBR_DESTROY_IDLE_OR_ASSIGNED 2 jbe@0: jbe@0: jbe@0: /*** Types ***/ jbe@0: jbe@0: /* Enum for 'moonbr_pstate' */ jbe@0: #define MOONBR_PSTATE_STARTUP 0 jbe@0: #define MOONBR_PSTATE_RUNNING 1 jbe@0: #define MOONBR_PSTATE_FORKED 2 jbe@0: jbe@0: /* Enum for 'proto' field of struct moonbr_listener */ jbe@274: #define MOONBR_PROTO_INTERVAL 1 jbe@274: #define MOONBR_PROTO_LOCAL 2 jbe@274: #define MOONBR_PROTO_TCP 3 jbe@0: jbe@0: /* Data structure for a pool's listener that can accept incoming connections */ jbe@0: struct moonbr_listener { jbe@0: struct moonbr_pool *pool; jbe@0: struct moonbr_listener *prev_listener; /* previous idle or(!) connected listener */ jbe@0: struct moonbr_listener *next_listener; /* next idle or(!) connected listener */ jbe@0: int proto; jbe@0: union { jbe@0: struct { jbe@0: char *name; /* name of interval passed to 'connect' function as 'interval' field in table */ jbe@274: int main; /* nonzero = termination of 'connect' function causes shutdown */ jbe@0: int strict; /* nonzero = runtime of 'connect' function does not delay interval */ jbe@0: struct timeval delay; /* interval between invocations of 'connect' function */ jbe@0: struct timeval wakeup; /* point in time of next invocation */ jbe@0: } interval; jbe@0: struct { jbe@125: union { jbe@125: struct sockaddr addr_abstract; jbe@125: struct sockaddr_un addr_un; jbe@125: struct sockaddr_in addr_in; jbe@125: struct sockaddr_in6 addr_in6; jbe@125: } addr; jbe@125: socklen_t addrlen; jbe@125: } socket; jbe@125: } type_specific; jbe@125: union { jbe@0: struct { jbe@125: char ip[INET6_ADDRSTRLEN]; /* IP to listen on */ jbe@0: int port; /* port number to listen on (in host endianess) */ jbe@0: } tcp; jbe@0: } proto_specific; jbe@0: int listenfd; /* -1 = none */ jbe@0: int pollidx; /* -1 = none */ jbe@0: }; jbe@0: jbe@0: /* Data structure for a child process that is handling incoming connections */ jbe@0: struct moonbr_worker { jbe@0: struct moonbr_pool *pool; jbe@0: struct moonbr_worker *prev_worker; jbe@0: struct moonbr_worker *next_worker; jbe@0: struct moonbr_worker *prev_idle_worker; jbe@0: struct moonbr_worker *next_idle_worker; jbe@127: int main; /* nonzero = terminate Moonbridge when this worker dies */ jbe@0: int idle; /* nonzero = waiting for command from parent process */ jbe@0: int assigned; /* nonzero = currently handling a connection */ jbe@0: pid_t pid; jbe@0: int controlfd; /* socket to send/receive control message to/from child process */ jbe@0: int errorfd; /* socket to receive error output from child process' stderr */ jbe@0: char *errorlinebuf; /* optional buffer for collecting stderr data from child process */ jbe@0: int errorlinelen; /* number of bytes stored in 'errorlinebuf' */ jbe@0: int errorlineovf; /* nonzero = line length overflow */ jbe@0: struct timeval idle_expiration; /* point in time until child process may stay in idle state */ jbe@0: struct moonbr_listener *restart_interval_listener; /* set while interval listener is assigned */ jbe@0: }; jbe@0: jbe@0: /* Data structure for a pool of workers and listeners */ jbe@0: struct moonbr_pool { jbe@0: int poolnum; /* number of pool for log output */ jbe@0: struct moonbr_pool *next_pool; /* next entry in linked list starting with 'moonbr_first_pool' */ jbe@0: struct moonbr_worker *first_worker; /* first worker of pool */ jbe@0: struct moonbr_worker *last_worker; /* last worker of pool */ jbe@0: struct moonbr_worker *first_idle_worker; /* first idle worker of pool */ jbe@0: struct moonbr_worker *last_idle_worker; /* last idle worker of pool */ jbe@0: int idle_worker_count; jbe@0: int unassigned_worker_count; jbe@0: int total_worker_count; jbe@0: int worker_count_stat; /* only needed for statistics */ jbe@0: int pre_fork; /* desired minimum number of unassigned workers */ jbe@0: int min_fork; /* desired minimum number of workers in total */ jbe@0: int max_fork; /* maximum number of workers */ jbe@0: struct timeval fork_delay; /* delay after each fork() until a fork may happen again */ jbe@0: struct timeval fork_wakeup; /* point in time when a fork may happen again (unless a worker terminates before) */ jbe@0: struct timeval fork_error_delay; /* delay between fork()s when an error during fork or preparation occurred */ jbe@0: struct timeval fork_error_wakeup; /* point in time when fork may happen again if an error in preparation occurred */ jbe@0: int use_fork_error_wakeup; /* nonzero = error in preparation occured; gets reset on next fork */ jbe@0: struct timeval exit_delay; /* delay for terminating excessive workers (unassigned_worker_count > pre_fork) */ jbe@0: struct timeval exit_wakeup; /* point in time when terminating an excessive worker */ jbe@0: struct timeval idle_timeout; /* delay before an idle worker is terminated */ jbe@0: size_t memory_limit; /* maximum bytes of memory that the Lua machine may allocate */ jbe@0: int listener_count; /* total number of listeners of pool (and size of 'listener' array at end of this struct) */ jbe@0: struct moonbr_listener *first_idle_listener; /* first listener that is idle (i.e. has no waiting connection) */ jbe@0: struct moonbr_listener *last_idle_listener; /* last listener that is idle (i.e. has no waiting connection) */ jbe@0: struct moonbr_listener *first_connected_listener; /* first listener that has a pending connection */ jbe@0: struct moonbr_listener *last_connected_listener; /* last listener that has a pending connection */ jbe@0: struct moonbr_listener listener[1]; /* static array of variable(!) size to contain 'listener' structures */ jbe@0: }; jbe@0: jbe@0: /* Enum for 'channel' field of struct moonbr_poll_worker */ jbe@0: #define MOONBR_POLL_WORKER_CONTROLCHANNEL 1 jbe@0: #define MOONBR_POLL_WORKER_ERRORCHANNEL 2 jbe@0: jbe@0: /* Structure to refer from 'moonbr_poll_worker_fds' entry to worker structure */ jbe@0: struct moonbr_poll_worker { jbe@0: struct moonbr_worker *worker; jbe@0: int channel; /* field indicating whether file descriptor is 'controlfd' or 'errorfd' */ jbe@0: }; jbe@0: jbe@0: /* Variable indicating that clean shutdown was requested */ jbe@0: static int moonbr_shutdown_in_progress = 0; jbe@0: jbe@0: jbe@0: /*** Macros for Lua registry ***/ jbe@0: jbe@0: /* Lightuserdata keys for Lua registry to store 'prepare', 'connect', and 'finish' functions */ jbe@0: #define moonbr_luakey_prepare_func(pool) ((void *)(intptr_t)(pool) + 0) jbe@0: #define moonbr_luakey_connect_func(pool) ((void *)(intptr_t)(pool) + 1) jbe@0: #define moonbr_luakey_finish_func(pool) ((void *)(intptr_t)(pool) + 2) jbe@0: jbe@0: jbe@0: /*** Global variables ***/ jbe@0: jbe@0: /* State of process execution */ jbe@0: static int moonbr_pstate = MOONBR_PSTATE_STARTUP; jbe@0: jbe@0: /* Process ID of the main process */ jbe@0: static pid_t moonbr_masterpid; jbe@0: jbe@0: /* Condition variables set by the signal handler */ jbe@0: static volatile sig_atomic_t moonbr_cond_poll = 0; jbe@0: static volatile sig_atomic_t moonbr_cond_terminate = 0; jbe@0: static volatile sig_atomic_t moonbr_cond_interrupt = 0; jbe@0: static volatile sig_atomic_t moonbr_cond_child = 0; jbe@0: jbe@0: /* Socket pair to denote signal delivery when signal handler was called just before poll() */ jbe@0: static int moonbr_poll_signalfds[2]; jbe@0: #define moonbr_poll_signalfd_read moonbr_poll_signalfds[0] jbe@0: #define moonbr_poll_signalfd_write moonbr_poll_signalfds[1] jbe@0: jbe@0: /* Global variables for pidfile and logging */ jbe@0: static struct pidfh *moonbr_pidfh = NULL; jbe@0: static FILE *moonbr_logfile = NULL; jbe@0: static int moonbr_use_syslog = 0; jbe@0: jbe@0: /* First and last entry of linked list of all created pools during initialization */ jbe@0: static struct moonbr_pool *moonbr_first_pool = NULL; jbe@0: static struct moonbr_pool *moonbr_last_pool = NULL; jbe@0: jbe@0: /* Total count of pools */ jbe@0: static int moonbr_pool_count = 0; jbe@0: jbe@0: /* Set to a nonzero value if dynamic part of 'moonbr_poll_fds' ('moonbr_poll_worker_fds') needs an update */ jbe@0: static int moonbr_poll_refresh_needed = 0; jbe@0: jbe@0: /* Array passed to poll(), consisting of static part and dynamic part ('moonbr_poll_worker_fds') */ jbe@0: static struct pollfd *moonbr_poll_fds = NULL; /* the array */ jbe@0: static int moonbr_poll_fds_bufsize = 0; /* memory allocated for this number of elements */ jbe@0: static int moonbr_poll_fds_count = 0; /* total number of elements */ jbe@0: static int moonbr_poll_fds_static_count; /* number of elements in static part */ jbe@0: jbe@0: /* Dynamic part of 'moonbr_poll_fds' array */ jbe@0: #define moonbr_poll_worker_fds (moonbr_poll_fds+moonbr_poll_fds_static_count) jbe@0: jbe@0: /* Additional information for dynamic part of 'moonbr_poll_fds' array */ jbe@0: struct moonbr_poll_worker *moonbr_poll_workers; /* the array */ jbe@0: static int moonbr_poll_workers_bufsize = 0; /* memory allocated for this number of elements */ jbe@0: static int moonbr_poll_worker_count = 0; /* number of elements in array */ jbe@0: jbe@0: /* Variable set to nonzero value to disallow further calls of 'listen' function */ jbe@0: static int moonbr_booted = 0; jbe@0: jbe@0: /* Verbosity settings */ jbe@0: static int moonbr_debug = 0; jbe@0: static int moonbr_stat = 0; jbe@0: jbe@0: /* Memory consumption by Lua machine */ jbe@0: static size_t moonbr_memory_usage = 0; jbe@0: static size_t moonbr_memory_limit = 0; jbe@0: jbe@0: jbe@0: /*** Functions for signal handling ***/ jbe@0: jbe@0: /* Signal handler for master and child processes */ jbe@0: static void moonbr_signal(int sig) { jbe@265: int errno2 = errno; /* backup errno variable */ jbe@0: if (getpid() == moonbr_masterpid) { jbe@0: /* master process */ jbe@0: switch (sig) { jbe@0: case SIGHUP: jbe@0: case SIGINT: jbe@0: /* fast shutdown requested */ jbe@0: moonbr_cond_interrupt = 1; jbe@0: break; jbe@0: case SIGTERM: jbe@0: /* clean shutdown requested */ jbe@0: moonbr_cond_terminate = 1; jbe@0: break; jbe@0: case SIGCHLD: jbe@0: /* child process terminated */ jbe@0: moonbr_cond_child = 1; jbe@0: break; jbe@0: } jbe@0: if (moonbr_cond_poll) { jbe@0: /* avoid race condition if signal handler is invoked right before poll() */ jbe@0: char buf[1] = {0}; jbe@20: write(moonbr_poll_signalfd_write, buf, 1); jbe@0: } jbe@0: } else { jbe@0: /* child process forwards certain signals to parent process */ jbe@0: switch (sig) { jbe@0: case SIGHUP: jbe@0: case SIGINT: jbe@0: kill(moonbr_masterpid, sig); jbe@0: } jbe@0: } jbe@265: errno = errno2; /* restore errno from backup */ jbe@0: } jbe@0: jbe@0: /* Initialize signal handling */ jbe@0: static void moonbr_signal_init(){ jbe@0: moonbr_masterpid = getpid(); jbe@0: signal(SIGHUP, moonbr_signal); jbe@0: signal(SIGINT, moonbr_signal); jbe@0: signal(SIGTERM, moonbr_signal); jbe@0: signal(SIGCHLD, moonbr_signal); jbe@0: } jbe@0: jbe@0: jbe@0: /*** Functions for logging in master process ***/ jbe@0: jbe@0: /* Logs a pre-formatted message with given syslog() priority */ jbe@0: static void moonbr_log_msg(int priority, const char *msg) { jbe@0: if (moonbr_logfile) { jbe@0: /* logging to logfile desired (timestamp is prepended in that case) */ jbe@0: time_t now_time = 0; jbe@0: struct tm now_tmstruct; jbe@0: char timestr[MOONBR_LOG_MAXTIMELEN+1]; jbe@0: time(&now_time); jbe@0: localtime_r(&now_time, &now_tmstruct); jbe@0: if (!strftime( jbe@0: timestr, MOONBR_LOG_MAXTIMELEN+1, "%Y-%m-%d %H:%M:%S %Z: ", &now_tmstruct jbe@0: )) timestr[0] = 0; jbe@0: fprintf(moonbr_logfile, "%s%s\n", timestr, msg); jbe@0: } jbe@0: if (moonbr_use_syslog) { jbe@0: /* logging through syslog desired */ jbe@0: syslog(priority, "%s", msg); jbe@0: } jbe@0: } jbe@0: jbe@0: /* Formats a message via vsnprintf() and logs it with given syslog() priority */ jbe@0: static void moonbr_log(int priority, const char *message, ...) { jbe@0: char msgbuf[MOONBR_LOG_MAXMSGLEN+1]; /* buffer of static size to store formatted message */ jbe@0: int msglen; /* length of full message (may exceed MOONBR_LOG_MAXMSGLEN) */ jbe@0: { jbe@0: /* pass variable arguments to vsnprintf() to format message */ jbe@0: va_list ap; jbe@0: va_start(ap, message); jbe@0: msglen = vsnprintf(msgbuf, MOONBR_LOG_MAXMSGLEN+1, message, ap); jbe@0: va_end(ap); jbe@0: } jbe@0: { jbe@0: /* split and log message line by line */ jbe@0: char *line = msgbuf; jbe@0: while (1) { jbe@0: char *endptr = strchr(line, '\n'); jbe@0: if (endptr) { jbe@0: /* terminate string where newline character is found */ jbe@0: *endptr = 0; jbe@0: } else if (line != msgbuf && msglen > MOONBR_LOG_MAXMSGLEN) { jbe@0: /* break if line is incomplete and not the first line */ jbe@0: break; jbe@0: } jbe@0: moonbr_log_msg(priority, line); jbe@0: if (!endptr) break; /* break if end of formatted message is reached */ jbe@0: line = endptr+1; /* otherwise continue with remaining message */ jbe@0: } jbe@0: } jbe@0: if (msglen > MOONBR_LOG_MAXMSGLEN) { jbe@0: /* print warning if message was truncated */ jbe@0: moonbr_log_msg(priority, "Previous log message has been truncated due to excessive length"); jbe@0: } jbe@0: } jbe@0: jbe@0: jbe@0: /*** Termination function ***/ jbe@0: jbe@0: /* Kill all child processes, remove PID file (if existent), and exit master process with given exitcode */ jbe@0: static void moonbr_terminate(int exitcode) { jbe@0: { jbe@0: struct moonbr_pool *pool; jbe@0: for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { jbe@0: { jbe@0: struct moonbr_worker *worker; jbe@0: for (worker=pool->first_worker; worker; worker=worker->next_worker) { jbe@0: moonbr_log(LOG_INFO, "Sending SIGKILL to child with PID %i", (int)worker->pid); jbe@0: if (kill(worker->pid, SIGKILL)) { jbe@0: moonbr_log(LOG_ERR, "Error while killing child process: %s", strerror(errno)); jbe@0: } jbe@0: } jbe@0: } jbe@0: { jbe@0: int i; jbe@0: for (i=0; ilistener_count; i++) { jbe@0: struct moonbr_listener *listener = &pool->listener[i]; jbe@0: if (listener->proto == MOONBR_PROTO_LOCAL) { jbe@125: moonbr_log(LOG_INFO, "Unlinking local socket \"%s\"", listener->type_specific.socket.addr.addr_un.sun_path); jbe@125: if (unlink(listener->type_specific.socket.addr.addr_un.sun_path)) { jbe@0: moonbr_log(LOG_ERR, "Error while unlinking local socket: %s", strerror(errno)); jbe@0: } jbe@0: } jbe@0: } jbe@0: } jbe@0: } jbe@0: } jbe@0: moonbr_log(exitcode ? LOG_ERR : LOG_NOTICE, "Terminating with exit code %i", exitcode); jbe@0: if (moonbr_pidfh && pidfile_remove(moonbr_pidfh)) { jbe@0: moonbr_log(LOG_ERR, "Error while removing PID file: %s", strerror(errno)); jbe@0: } jbe@0: exit(exitcode); jbe@0: } jbe@0: jbe@0: /* Terminate with either MOONBR_EXITCODE_STARTUPERROR or MOONBR_EXITCODE_RUNTIMEERROR */ jbe@0: #define moonbr_terminate_error() \ jbe@0: moonbr_terminate( \ jbe@0: moonbr_pstate == MOONBR_PSTATE_STARTUP ? \ jbe@0: MOONBR_EXITCODE_STARTUPERROR : \ jbe@0: MOONBR_EXITCODE_RUNTIMEERROR \ jbe@0: ) jbe@0: jbe@0: jbe@0: /*** Helper functions ***/ jbe@0: jbe@0: /* Fills a 'struct timeval' structure with the current time (using CLOCK_MONOTONIC) */ jbe@0: static void moonbr_now(struct timeval *now) { jbe@0: struct timespec ts = {0, }; jbe@0: if (clock_gettime(CLOCK_MONOTONIC, &ts)) { jbe@0: moonbr_log(LOG_CRIT, "Error in clock_gettime() call: %s", strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: *now = (struct timeval){ .tv_sec = ts.tv_sec, .tv_usec = ts.tv_nsec / 1000 }; jbe@0: } jbe@0: jbe@0: /* Formats a 'struct timeval' value (not thread-safe) */ jbe@0: static char *moonbr_format_timeval(struct timeval *t) { jbe@0: static char buf[32]; jbe@0: snprintf(buf, 32, "%ji.%06ji seconds", (intmax_t)t->tv_sec, (intmax_t)t->tv_usec); jbe@0: return buf; jbe@0: } jbe@0: jbe@0: jbe@0: /*** Functions for pool creation and startup ***/ jbe@0: jbe@0: /* Creates a 'struct moonbr_pool' structure with a given number of listeners */ jbe@0: static struct moonbr_pool *moonbr_create_pool(int listener_count) { jbe@0: struct moonbr_pool *pool; jbe@0: pool = calloc(1, jbe@0: sizeof(struct moonbr_pool) + /* size of 'struct moonbr_pool' with one listener */ jbe@0: (listener_count-1) * sizeof(struct moonbr_listener) /* size of extra listeners */ jbe@0: ); jbe@0: if (!pool) { jbe@0: moonbr_log(LOG_CRIT, "Memory allocation error"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: pool->listener_count = listener_count; jbe@0: { jbe@0: /* initialization of listeners */ jbe@0: int i; jbe@0: for (i=0; ilistener[i]; jbe@0: listener->pool = pool; jbe@0: listener->listenfd = -1; jbe@0: listener->pollidx = -1; jbe@0: } jbe@0: } jbe@0: return pool; jbe@0: } jbe@0: jbe@0: /* Destroys a 'struct moonbr_pool' structure before it has been started */ jbe@0: static void moonbr_destroy_pool(struct moonbr_pool *pool) { jbe@0: int i; jbe@0: for (i=0; ilistener_count; i++) { jbe@0: struct moonbr_listener *listener = &pool->listener[i]; jbe@0: if ( jbe@0: listener->proto == MOONBR_PROTO_INTERVAL && jbe@125: listener->type_specific.interval.name jbe@0: ) { jbe@125: free(listener->type_specific.interval.name); jbe@0: } jbe@0: } jbe@0: free(pool); jbe@0: } jbe@0: jbe@0: /* Starts a all listeners in a pool */ jbe@0: static int moonbr_start_pool(struct moonbr_pool *pool) { jbe@0: moonbr_log(LOG_INFO, "Creating pool", pool->poolnum); jbe@0: { jbe@0: int i; jbe@0: for (i=0; ilistener_count; i++) { jbe@0: struct moonbr_listener *listener = &pool->listener[i]; jbe@0: switch (listener->proto) { jbe@0: case MOONBR_PROTO_INTERVAL: jbe@274: if (listener->type_specific.interval.main) { jbe@274: /* nothing to do here: starting main thread is performed in moonbr_run() function */ jbe@274: if (!listener->type_specific.interval.name) { jbe@274: moonbr_log(LOG_INFO, "Adding unnamed main thread"); jbe@274: } else { jbe@274: moonbr_log(LOG_INFO, "Adding main thread \"%s\"", listener->type_specific.interval.name); jbe@274: } jbe@0: } else { jbe@274: /* nothing to do here: starting intervals is performed in moonbr_run() function */ jbe@274: if (!listener->type_specific.interval.name) { jbe@274: moonbr_log(LOG_INFO, "Adding unnamed interval listener"); jbe@274: } else { jbe@274: moonbr_log(LOG_INFO, "Adding interval listener \"%s\"", listener->type_specific.interval.name); jbe@274: } jbe@0: } jbe@0: break; jbe@0: case MOONBR_PROTO_LOCAL: jbe@125: moonbr_log(LOG_INFO, "Adding local socket listener for path \"%s\"", listener->type_specific.socket.addr.addr_un.sun_path); jbe@125: listener->listenfd = socket(PF_LOCAL, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); jbe@125: if (listener->listenfd == -1) goto moonbr_start_pool_error; jbe@125: if (!unlink(listener->type_specific.socket.addr.addr_un.sun_path)) { jbe@125: moonbr_log(LOG_WARNING, "Unlinked named socket \"%s\" prior to listening", listener->type_specific.socket.addr.addr_un.sun_path); jbe@125: } else { jbe@125: if (errno != ENOENT) { jbe@125: moonbr_log(LOG_ERR, "Could not unlink named socket \"%s\" prior to listening: %s", listener->type_specific.socket.addr.addr_un.sun_path, strerror(errno)); jbe@125: } jbe@125: } jbe@125: if ( jbe@125: bind(listener->listenfd, &listener->type_specific.socket.addr.addr_abstract, listener->type_specific.socket.addrlen) jbe@125: ) goto moonbr_start_pool_error; jbe@125: if (listen(listener->listenfd, MOONBR_LISTEN_BACKLOG)) goto moonbr_start_pool_error; jbe@125: break; jbe@125: case MOONBR_PROTO_TCP: jbe@125: moonbr_log(LOG_INFO, "Adding TCP listener on interface \"%s\", port %i", listener->proto_specific.tcp.ip, listener->proto_specific.tcp.port); jbe@134: listener->listenfd = socket(listener->type_specific.socket.addr.addr_abstract.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); /* NOTE: not correctly using PF_* but AF_* constants here */ jbe@125: if (listener->listenfd == -1) goto moonbr_start_pool_error; jbe@0: { jbe@125: /* avoid "Address already in use" error when restarting service */ jbe@125: static const int reuseval = 1; jbe@125: if (setsockopt( jbe@125: listener->listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseval, sizeof(reuseval) jbe@125: )) goto moonbr_start_pool_error; jbe@0: } jbe@0: { jbe@125: /* default to send TCP RST when process terminates unexpectedly */ jbe@125: static const struct linger lingerval = { jbe@125: .l_onoff = 1, jbe@125: .l_linger = 0 jbe@0: }; jbe@125: if (setsockopt( jbe@125: listener->listenfd, SOL_SOCKET, SO_LINGER, &lingerval, sizeof(lingerval) jbe@125: )) goto moonbr_start_pool_error; jbe@0: } jbe@125: if ( jbe@125: bind(listener->listenfd, &listener->type_specific.socket.addr.addr_abstract, listener->type_specific.socket.addrlen) jbe@125: ) goto moonbr_start_pool_error; jbe@125: if (listen(listener->listenfd, MOONBR_LISTEN_BACKLOG)) goto moonbr_start_pool_error; jbe@0: break; jbe@0: default: jbe@0: moonbr_log(LOG_CRIT, "Internal error (should not happen): Unexpected value in listener.proto field"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: goto moonbr_start_pool_ok; jbe@0: moonbr_start_pool_error: jbe@0: { jbe@0: int j = i; jbe@0: int errno2 = errno; jbe@0: for (; i>=0; i--) { jbe@0: struct moonbr_listener *listener = &pool->listener[i]; jbe@0: if (listener->listenfd != -1) close(listener->listenfd); jbe@0: } jbe@0: errno = errno2; jbe@0: return j; jbe@0: } jbe@0: } jbe@0: moonbr_start_pool_ok: jbe@0: pool->poolnum = ++moonbr_pool_count; jbe@0: moonbr_log(LOG_INFO, "Pool #%i created", pool->poolnum); jbe@0: if (moonbr_last_pool) moonbr_last_pool->next_pool = pool; jbe@0: else moonbr_first_pool = pool; jbe@0: moonbr_last_pool = pool; jbe@0: return -1; jbe@0: } jbe@0: jbe@0: jbe@0: /*** Function to send data and a file descriptor to child process */ jbe@0: jbe@0: /* Sends control message of one bye plus optional file descriptor plus optional pointer to child process */ jbe@0: static void moonbr_send_control_message(struct moonbr_worker *worker, char status, int fd, void *ptr) { jbe@0: { jbe@0: struct iovec iovector = { .iov_base = &status, .iov_len = 1 }; /* carrying status byte */ jbe@0: char control_message_buffer[CMSG_SPACE(sizeof(int))] = {0, }; /* used to transfer file descriptor */ jbe@0: struct msghdr message = { .msg_iov = &iovector, .msg_iovlen = 1 }; /* data structure passed to sendmsg() call */ jbe@0: if (moonbr_debug) { jbe@0: if (fd == -1) { jbe@0: moonbr_log(LOG_DEBUG, "Sending control message \"%c\" to child process in pool #%i (PID %i)", (int)status, worker->pool->poolnum, (int)worker->pid); jbe@0: } else { jbe@0: moonbr_log(LOG_DEBUG, "Sending control message \"%c\" with file descriptor #%i to child process in pool #%i (PID %i)", (int)status, fd, worker->pool->poolnum, (int)worker->pid); jbe@0: } jbe@0: } jbe@0: if (fd != -1) { jbe@0: /* attach control message with file descriptor */ jbe@0: message.msg_control = control_message_buffer; jbe@0: message.msg_controllen = CMSG_SPACE(sizeof(int)); jbe@0: { jbe@0: struct cmsghdr *control_message = CMSG_FIRSTHDR(&message); jbe@0: control_message->cmsg_level = SOL_SOCKET; jbe@0: control_message->cmsg_type = SCM_RIGHTS; jbe@0: control_message->cmsg_len = CMSG_LEN(sizeof(int)); jbe@18: memcpy(CMSG_DATA(control_message), &fd, sizeof(int)); jbe@0: } jbe@0: } jbe@0: while (sendmsg(worker->controlfd, &message, MSG_NOSIGNAL) < 0) { jbe@0: if (errno == EPIPE) { jbe@0: moonbr_log(LOG_ERR, "Error while communicating with idle child process in pool #%i (PID %i): %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); jbe@0: return; /* do not close socket; socket is closed when reading from it */ jbe@0: } jbe@0: if (errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Unexpected error while communicating with idle child process in pool #%i (PID %i): %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: } jbe@0: if (ptr) { jbe@0: char buf[sizeof(void *)]; jbe@0: char *pos = buf; jbe@0: int len = sizeof(void *); jbe@0: ssize_t written; jbe@0: if (moonbr_debug) { jbe@0: moonbr_log(LOG_DEBUG, "Sending memory pointer to child process in pool #%i (PID %i)", (int)status, worker->pool->poolnum, (int)worker->pid); jbe@0: } jbe@18: memcpy(buf, &ptr, sizeof(void *)); jbe@0: while (len) { jbe@0: written = send(worker->controlfd, pos, len, MSG_NOSIGNAL); jbe@0: if (written > 0) { jbe@0: pos += written; jbe@0: len -= written; jbe@0: } else if (errno == EPIPE) { jbe@0: moonbr_log(LOG_ERR, "Error while communicating with idle child process in pool #%i (PID %i): %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); jbe@0: return; /* do not close socket; socket is closed when reading from it */ jbe@0: } else if (errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Unexpected error while communicating with idle child process in pool #%i (PID %i): %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: } jbe@0: } jbe@0: jbe@0: jbe@0: /*** Functions running in child process ***/ jbe@0: jbe@0: /* Logs an error in child process */ jbe@0: static void moonbr_child_log(const char *message) { jbe@0: fprintf(stderr, "%s\n", message); jbe@0: } jbe@0: jbe@0: /* Logs a fatal error in child process and terminates process with error status */ jbe@0: static void moonbr_child_log_fatal(const char *message) { jbe@0: moonbr_child_log(message); jbe@0: exit(1); jbe@0: } jbe@0: jbe@0: /* Logs an error in child process while appending error string for global errno variable */ jbe@0: static void moonbr_child_log_errno(const char *message) { jbe@213: char errmsg[MOONBR_MAXSTRERRORLEN] = MOONBR_STRERROR_R_MSG; jbe@20: strerror_r(errno, errmsg, MOONBR_MAXSTRERRORLEN); /* use thread-safe call in case child created threads */ jbe@0: fprintf(stderr, "%s: %s\n", message, errmsg); jbe@0: } jbe@0: jbe@0: /* Logs a fatal error in child process while appending error string for errno and terminating process */ jbe@0: static void moonbr_child_log_errno_fatal(const char *message) { jbe@0: moonbr_child_log_errno(message); jbe@0: exit(1); jbe@0: } jbe@0: jbe@0: /* Receives a control message consisting of one character plus an optional file descriptor from parent process */ jbe@0: static void moonbr_child_receive_control_message(int socketfd, char *status, int *fd) { jbe@0: struct iovec iovector = { .iov_base = status, .iov_len = 1 }; /* reference to status byte variable */ jbe@0: char control_message_buffer[CMSG_SPACE(sizeof(int))] = {0, }; /* used to receive file descriptor */ jbe@0: struct msghdr message = { /* data structure passed to recvmsg() call */ jbe@0: .msg_iov = &iovector, jbe@0: .msg_iovlen = 1, jbe@0: .msg_control = control_message_buffer, jbe@0: .msg_controllen = CMSG_SPACE(sizeof(int)) jbe@0: }; jbe@0: { jbe@0: int received; jbe@0: while ((received = recvmsg(socketfd, &message, MSG_CMSG_CLOEXEC)) < 0) { jbe@0: if (errno != EINTR) { jbe@0: moonbr_child_log_errno_fatal("Error while trying to receive connection socket from parent process"); jbe@0: } jbe@0: } jbe@0: if (!received) { jbe@0: moonbr_child_log_fatal("Unexpected EOF while trying to receive connection socket from parent process"); jbe@0: } jbe@0: } jbe@0: { jbe@0: struct cmsghdr *control_message = CMSG_FIRSTHDR(&message); jbe@0: if (control_message) { jbe@0: if (control_message->cmsg_level != SOL_SOCKET) { jbe@0: moonbr_child_log_fatal("Received control message with cmsg_level not equal to SOL_SOCKET"); jbe@0: } jbe@0: if (control_message->cmsg_type != SCM_RIGHTS) { jbe@0: moonbr_child_log_fatal("Received control message with cmsg_type not equal to SCM_RIGHTS"); jbe@0: } jbe@18: memcpy(fd, CMSG_DATA(control_message), sizeof(int)); jbe@0: } else { jbe@0: *fd = -1; jbe@0: } jbe@0: } jbe@0: } jbe@0: jbe@0: /* Receives a pointer from parent process */ jbe@0: static void *moonbr_child_receive_pointer(int socketfd) { jbe@0: char buf[sizeof(void *)]; jbe@0: char *pos = buf; jbe@0: int len = sizeof(void *); jbe@0: ssize_t bytes_read; jbe@0: while (len) { jbe@0: bytes_read = recv(socketfd, pos, len, 0); jbe@0: if (bytes_read > 0) { jbe@0: pos += bytes_read; jbe@0: len -= bytes_read; jbe@0: } else if (!bytes_read) { jbe@0: moonbr_child_log_fatal("Unexpected EOF while trying to receive memory pointer from parent process"); jbe@0: } else if (errno != EINTR) { jbe@0: moonbr_child_log_errno_fatal("Error while trying to receive memory pointer from parent process"); jbe@0: } jbe@0: } jbe@18: { jbe@18: void *ptr; /* avoid breaking strict-aliasing rules */ jbe@18: memcpy(&ptr, buf, sizeof(void *)); jbe@18: return ptr; jbe@18: } jbe@0: } jbe@0: jbe@0: /* Main function of child process to be called after fork() and file descriptor rearrangement */ jbe@0: void moonbr_child_run(struct moonbr_pool *pool, lua_State *L) { jbe@0: char controlmsg; jbe@88: int fd; jbe@0: struct itimerval notimer = { { 0, }, { 0, } }; jbe@0: lua_rawgetp(L, LUA_REGISTRYINDEX, moonbr_luakey_prepare_func(pool)); jbe@0: if (lua_isnil(L, -1)) lua_pop(L, 1); jbe@0: else if (lua_pcall(L, 0, 0, 1)) { jbe@0: fprintf(stderr, "Error in \"prepare\" function: %s\n", lua_tostring(L, -1)); jbe@0: exit(1); jbe@0: } jbe@293: moonbr_io_catch_sigterm(L); // NOTE: should not fail jbe@0: while (1) { jbe@0: struct moonbr_listener *listener; jbe@0: if (setitimer(ITIMER_REAL, ¬imer, NULL)) { jbe@0: moonbr_child_log_errno_fatal("Could not reset ITIMER_REAL via setitimer()"); jbe@0: } jbe@0: controlmsg = MOONBR_STATUS_IDLE; jbe@0: if (write(MOONBR_FD_CONTROL, &controlmsg, 1) <= 0) { jbe@0: moonbr_child_log_errno_fatal("Error while sending ready message to parent process"); jbe@0: } jbe@88: moonbr_child_receive_control_message(MOONBR_FD_CONTROL, &controlmsg, &fd); jbe@0: if (!( jbe@124: (controlmsg == MOONBR_COMMAND_TERMINATE && fd == -1) || jbe@124: (controlmsg == MOONBR_COMMAND_CONNECT) jbe@0: )) { jbe@0: moonbr_child_log_fatal("Received illegal control message from parent process"); jbe@0: } jbe@0: if (controlmsg == MOONBR_COMMAND_TERMINATE) break; jbe@293: moonbr_io_sigterm_flag = 0; /* ignore any prior SIGTERM (can't be handled in blocking recv anyway) */ jbe@0: listener = moonbr_child_receive_pointer(MOONBR_FD_CONTROL); jbe@127: if ( jbe@127: listener->proto != MOONBR_PROTO_LOCAL && jbe@127: listener->proto != MOONBR_PROTO_TCP && jbe@127: fd >= 0 jbe@127: ) { jbe@124: moonbr_child_log_fatal("Received unexpected file descriptor from parent process"); jbe@127: } else if ( jbe@274: listener->proto != MOONBR_PROTO_INTERVAL && fd < 0 jbe@127: ) { jbe@124: moonbr_child_log_fatal("Missing file descriptor from parent process"); jbe@124: } jbe@119: if (fd >= 0) moonbr_io_pushhandle(L, fd); jbe@0: lua_rawgetp(L, LUA_REGISTRYINDEX, moonbr_luakey_connect_func(pool)); jbe@119: if (fd < 0) { jbe@0: lua_newtable(L); jbe@274: if (listener->proto == MOONBR_PROTO_INTERVAL) { jbe@131: lua_pushstring(L, jbe@131: listener->type_specific.interval.name ? jbe@131: listener->type_specific.interval.name : "" jbe@131: ); jbe@274: if (listener->type_specific.interval.main) { jbe@274: lua_setfield(L, -2, "main"); jbe@274: } else { jbe@274: lua_setfield(L, -2, "interval"); jbe@274: } jbe@131: } jbe@0: } else { jbe@88: lua_pushvalue(L, -2); jbe@0: } jbe@0: if (lua_pcall(L, 1, 1, 1)) { jbe@0: fprintf(stderr, "Error in \"connect\" function: %s\n", lua_tostring(L, -1)); jbe@0: exit(1); jbe@0: } jbe@119: if (fd >= 0) moonbr_io_closehandle(L, -2, 0); /* attemt clean close */ jbe@293: if ( jbe@293: moonbr_io_sigterm_flag || jbe@293: lua_type(L, -1) != LUA_TBOOLEAN || !lua_toboolean(L, -1) jbe@293: ) break; jbe@0: #ifdef MOONBR_LUA_PANIC_BUG_WORKAROUND jbe@0: lua_settop(L, 2); jbe@0: #else jbe@0: lua_settop(L, 1); jbe@0: #endif jbe@0: } jbe@0: controlmsg = MOONBR_STATUS_GOODBYE; jbe@0: if (write(MOONBR_FD_CONTROL, &controlmsg, 1) <= 0) { jbe@0: moonbr_child_log_errno_fatal("Error while sending goodbye message to parent process"); jbe@0: } jbe@0: if (close(MOONBR_FD_CONTROL) && errno != EINTR) { jbe@0: moonbr_child_log_errno("Error while closing control socket"); jbe@0: } jbe@0: lua_rawgetp(L, LUA_REGISTRYINDEX, moonbr_luakey_finish_func(pool)); jbe@0: if (lua_isnil(L, -1)) lua_pop(L, 1); jbe@0: else if (lua_pcall(L, 0, 0, 1)) { jbe@0: fprintf(stderr, "Error in \"finish\" function: %s\n", lua_tostring(L, -1)); jbe@0: exit(1); jbe@0: } jbe@0: lua_close(L); jbe@0: exit(0); jbe@0: } jbe@0: jbe@0: jbe@0: /*** Functions to spawn child process ***/ jbe@0: jbe@0: /* Helper function to send an error message to a file descriptor (not needing a file stream) */ jbe@0: static void moonbr_child_emergency_print(int fd, char *message) { jbe@0: size_t len = strlen(message); jbe@0: ssize_t written; jbe@0: while (len) { jbe@0: written = write(fd, message, len); jbe@0: if (written > 0) { jbe@0: message += written; jbe@0: len -= written; jbe@0: } else { jbe@0: if (written != -1 || errno != EINTR) break; jbe@0: } jbe@0: } jbe@0: } jbe@0: jbe@0: /* Helper function to send an error message plus a text for errno to a file descriptor and terminate the process */ jbe@0: static void moonbr_child_emergency_error(int fd, char *message) { jbe@0: int errno2 = errno; jbe@0: moonbr_child_emergency_print(fd, message); jbe@0: moonbr_child_emergency_print(fd, ": "); jbe@0: moonbr_child_emergency_print(fd, strerror(errno2)); jbe@0: moonbr_child_emergency_print(fd, "\n"); jbe@0: exit(1); jbe@0: } jbe@0: jbe@0: /* Creates a child process and (in case of success) registers it in the 'struct moonbr_pool' structure */ jbe@0: static int moonbr_create_worker(struct moonbr_pool *pool, lua_State *L) { jbe@0: struct moonbr_worker *worker; jbe@0: worker = calloc(1, sizeof(struct moonbr_worker)); jbe@0: if (!worker) { jbe@0: moonbr_log(LOG_CRIT, "Memory allocation error"); jbe@0: return -1; jbe@0: } jbe@0: worker->pool = pool; jbe@0: { jbe@0: int controlfds[2]; jbe@0: int errorfds[2]; jbe@0: if (socketpair(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC, 0, controlfds)) { jbe@0: moonbr_log(LOG_ERR, "Could not create control socket pair for communcation with child process: %s", strerror(errno)); jbe@0: free(worker); jbe@0: return -1; jbe@0: } jbe@0: if (socketpair(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC, 0, errorfds)) { jbe@0: moonbr_log(LOG_ERR, "Could not create socket pair to redirect stderr of child process: %s", strerror(errno)); jbe@0: close(controlfds[0]); jbe@0: close(controlfds[1]); jbe@0: free(worker); jbe@0: return -1; jbe@0: } jbe@0: if (moonbr_logfile && fflush(moonbr_logfile)) { jbe@0: moonbr_log(LOG_CRIT, "Could not flush log file prior to forking: %s", strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: worker->pid = fork(); jbe@0: if (worker->pid == -1) { jbe@0: moonbr_log(LOG_ERR, "Could not fork: %s", strerror(errno)); jbe@0: close(controlfds[0]); jbe@0: close(controlfds[1]); jbe@0: close(errorfds[0]); jbe@0: close(errorfds[1]); jbe@0: free(worker); jbe@0: return -1; jbe@0: } else if (!worker->pid) { jbe@0: moonbr_pstate = MOONBR_PSTATE_FORKED; jbe@0: #ifdef MOONBR_LUA_PANIC_BUG_WORKAROUND jbe@0: lua_pushliteral(L, "Failed to pass error message due to bug in Lua panic handler (hint: not enough memory?)"); jbe@0: #endif jbe@0: moonbr_memory_limit = pool->memory_limit; jbe@0: if (moonbr_pidfh && pidfile_close(moonbr_pidfh)) { jbe@0: moonbr_child_emergency_error(errorfds[1], "Could not close PID file in forked child process"); jbe@0: } jbe@0: if (moonbr_logfile && moonbr_logfile != stderr && fclose(moonbr_logfile)) { jbe@0: moonbr_child_emergency_error(errorfds[1], "Could not close log file in forked child process"); jbe@0: } jbe@0: if (dup2(errorfds[1], MOONBR_FD_STDERR) == -1) { jbe@0: moonbr_child_emergency_error(errorfds[1], "Could not duplicate socket to stderr file descriptor"); jbe@0: } jbe@0: if (dup2(controlfds[1], MOONBR_FD_CONTROL) == -1) { jbe@0: moonbr_child_emergency_error(errorfds[1], "Could not duplicate control socket"); jbe@0: } jbe@0: closefrom(MOONBR_FD_END); jbe@0: moonbr_child_run(pool, L); jbe@0: } jbe@0: if (moonbr_stat) { jbe@0: moonbr_log(LOG_INFO, "Created new worker in pool #%i with PID %i", worker->pool->poolnum, (int)worker->pid); jbe@0: } jbe@0: worker->controlfd = controlfds[0]; jbe@0: worker->errorfd = errorfds[0]; jbe@0: if (close(controlfds[1]) && errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Could not close opposite end of control file descriptor after forking"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: if (close(errorfds[1]) && errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Could not close opposite end of control file descriptor after forking"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: worker->prev_worker = pool->last_worker; jbe@0: if (worker->prev_worker) worker->prev_worker->next_worker = worker; jbe@0: else pool->first_worker = worker; jbe@0: pool->last_worker = worker; jbe@0: pool->unassigned_worker_count++; jbe@0: pool->total_worker_count++; jbe@0: pool->worker_count_stat = 1; jbe@0: moonbr_poll_refresh_needed = 1; jbe@0: return 0; /* return zero only in case of success */ jbe@0: } jbe@0: jbe@0: jbe@0: /*** Functions for queues of 'struct moonbr_listener' ***/ jbe@0: jbe@0: /* Appends a 'struct moonbr_listener' to the queue of idle listeners and registers it for poll() */ jbe@0: static void moonbr_add_idle_listener(struct moonbr_listener *listener) { jbe@0: listener->prev_listener = listener->pool->last_idle_listener; jbe@0: if (listener->prev_listener) listener->prev_listener->next_listener = listener; jbe@0: else listener->pool->first_idle_listener = listener; jbe@0: listener->pool->last_idle_listener = listener; jbe@0: if (listener->pollidx != -1) moonbr_poll_fds[listener->pollidx].events |= POLLIN; jbe@0: } jbe@0: jbe@0: /* Removes a 'struct moonbr_listener' from the queue of idle listeners and unregisters it from poll() */ jbe@0: static void moonbr_remove_idle_listener(struct moonbr_listener *listener) { jbe@0: if (listener->prev_listener) listener->prev_listener->next_listener = listener->next_listener; jbe@0: else listener->pool->first_idle_listener = listener->next_listener; jbe@0: if (listener->next_listener) listener->next_listener->prev_listener = listener->prev_listener; jbe@0: else listener->pool->last_idle_listener = listener->prev_listener; jbe@0: listener->prev_listener = NULL; jbe@0: listener->next_listener = NULL; jbe@0: if (listener->pollidx != -1) moonbr_poll_fds[listener->pollidx].events &= ~POLLIN; jbe@0: } jbe@0: jbe@0: /* Adds a listener to the queue of connected listeners (i.e. waiting to have their incoming connection accepted) */ jbe@0: static void moonbr_add_connected_listener(struct moonbr_listener *listener) { jbe@0: listener->prev_listener = listener->pool->last_connected_listener; jbe@0: if (listener->prev_listener) listener->prev_listener->next_listener = listener; jbe@0: else listener->pool->first_connected_listener = listener; jbe@0: listener->pool->last_connected_listener = listener; jbe@0: } jbe@0: jbe@0: /* Removes and returns the first connected listener in the queue */ jbe@0: static struct moonbr_listener *moonbr_pop_connected_listener(struct moonbr_pool *pool) { jbe@0: struct moonbr_listener *listener = pool->first_connected_listener; jbe@0: listener->pool->first_connected_listener = listener->next_listener; jbe@0: if (listener->pool->first_connected_listener) listener->pool->first_connected_listener->prev_listener = NULL; jbe@0: else listener->pool->last_connected_listener = NULL; jbe@0: listener->next_listener = NULL; jbe@0: return listener; jbe@0: } jbe@0: jbe@0: jbe@0: /*** Functions to handle polling ***/ jbe@0: jbe@0: /* Returns an index to a new initialized entry in moonbr_poll_fds[] */ jbe@0: int moonbr_poll_fds_nextindex() { jbe@0: if (moonbr_poll_fds_count >= moonbr_poll_fds_bufsize) { jbe@0: if (moonbr_poll_fds_bufsize) moonbr_poll_fds_bufsize *= 2; jbe@0: else moonbr_poll_fds_bufsize = 1; jbe@0: moonbr_poll_fds = realloc( jbe@0: moonbr_poll_fds, moonbr_poll_fds_bufsize * sizeof(struct pollfd) jbe@0: ); jbe@0: if (!moonbr_poll_fds) { jbe@0: moonbr_log(LOG_CRIT, "Memory allocation error"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: moonbr_poll_fds[moonbr_poll_fds_count] = (struct pollfd){0, }; jbe@0: return moonbr_poll_fds_count++; jbe@0: } jbe@0: jbe@0: /* Returns an index to a new initialized entry in moonbr_poll_workers[] */ jbe@0: int moonbr_poll_workers_nextindex() { jbe@0: if (moonbr_poll_worker_count >= moonbr_poll_workers_bufsize) { jbe@0: if (moonbr_poll_workers_bufsize) moonbr_poll_workers_bufsize *= 2; jbe@0: else moonbr_poll_workers_bufsize = 1; jbe@0: moonbr_poll_workers = realloc( jbe@0: moonbr_poll_workers, moonbr_poll_workers_bufsize * sizeof(struct moonbr_poll_worker) jbe@0: ); jbe@0: if (!moonbr_poll_workers) { jbe@0: moonbr_log(LOG_CRIT, "Memory allocation error"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: moonbr_poll_workers[moonbr_poll_worker_count] = (struct moonbr_poll_worker){0, }; jbe@0: return moonbr_poll_worker_count++; jbe@0: } jbe@0: jbe@0: /* Queues all listeners as idle, and initializes static part of moonbr_poll_fds[], which is passed to poll() */ jbe@0: static void moonbr_poll_init() { jbe@0: if (socketpair( jbe@0: PF_LOCAL, jbe@0: SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, jbe@0: 0, jbe@0: moonbr_poll_signalfds jbe@0: )) { jbe@0: moonbr_log(LOG_CRIT, "Could not create socket pair for signal delivery during polling: %s", strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: { jbe@0: int j = moonbr_poll_fds_nextindex(); jbe@0: struct pollfd *pollfd = &moonbr_poll_fds[j]; jbe@0: pollfd->fd = moonbr_poll_signalfd_read; jbe@0: pollfd->events = POLLIN; jbe@0: } jbe@0: { jbe@0: struct moonbr_pool *pool; jbe@0: for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { jbe@0: int i; jbe@0: for (i=0; ilistener_count; i++) { jbe@0: struct moonbr_listener *listener = &pool->listener[i]; jbe@0: if (listener->listenfd != -1) { jbe@0: int j = moonbr_poll_fds_nextindex(); jbe@0: listener->pollidx = j; jbe@0: moonbr_poll_fds[j].fd = listener->listenfd; jbe@0: } jbe@0: moonbr_add_idle_listener(listener); jbe@0: } jbe@0: } jbe@0: } jbe@0: moonbr_poll_fds_static_count = moonbr_poll_fds_count; /* remember size of static part of array */ jbe@0: } jbe@0: jbe@0: /* Disables polling of all listeners (required for clean shutdown) */ jbe@0: static void moonbr_poll_shutdown() { jbe@0: int i; jbe@0: for (i=1; inext_pool) { jbe@0: struct moonbr_worker *worker; jbe@0: for (worker=pool->first_worker; worker; worker=worker->next_worker) { jbe@0: if (worker->controlfd != -1) { jbe@0: int j = moonbr_poll_fds_nextindex(); jbe@0: int k = moonbr_poll_workers_nextindex(); jbe@0: struct pollfd *pollfd = &moonbr_poll_fds[j]; jbe@0: struct moonbr_poll_worker *poll_worker = &moonbr_poll_workers[k]; jbe@0: pollfd->fd = worker->controlfd; jbe@0: pollfd->events = POLLIN; jbe@0: poll_worker->channel = MOONBR_POLL_WORKER_CONTROLCHANNEL; jbe@0: poll_worker->worker = worker; jbe@0: } jbe@0: if (worker->errorfd != -1) { jbe@0: int j = moonbr_poll_fds_nextindex(); jbe@0: int k = moonbr_poll_workers_nextindex(); jbe@0: struct pollfd *pollfd = &moonbr_poll_fds[j]; jbe@0: struct moonbr_poll_worker *poll_worker = &moonbr_poll_workers[k]; jbe@0: pollfd->fd = worker->errorfd; jbe@0: pollfd->events = POLLIN; jbe@0: poll_worker->channel = MOONBR_POLL_WORKER_ERRORCHANNEL; jbe@0: poll_worker->worker = worker; jbe@0: } jbe@0: } jbe@0: } jbe@0: } jbe@0: } jbe@0: jbe@0: /* resets socket and 'revents' field of moonbr_poll_fds[] for signal delivery just before poll() is called */ jbe@0: static void moonbr_poll_reset_signal() { jbe@0: ssize_t readcount; jbe@0: char buf[1]; jbe@0: moonbr_poll_fds[0].revents = 0; jbe@0: while ((readcount = read(moonbr_poll_signalfd_read, buf, 1)) < 0) { jbe@0: if (errno == EAGAIN) break; jbe@0: if (errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Error while reading from signal delivery socket: %s", strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: if (!readcount) { jbe@0: moonbr_log(LOG_CRIT, "Unexpected EOF when reading from signal delivery socket: %s", strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: jbe@0: jbe@0: /*** Shutdown initiation ***/ jbe@0: jbe@0: /* Sets global variable 'moonbr_shutdown_in_progress', closes listeners, and demands worker termination */ jbe@0: static void moonbr_initiate_shutdown() { jbe@0: struct moonbr_pool *pool; jbe@294: int i; jbe@294: struct moonbr_worker *worker; jbe@0: if (moonbr_shutdown_in_progress) { jbe@0: moonbr_log(LOG_NOTICE, "Shutdown already in progress"); jbe@0: return; jbe@0: } jbe@0: moonbr_shutdown_in_progress = 1; jbe@0: moonbr_log(LOG_NOTICE, "Initiate shutdown"); jbe@0: for (pool = moonbr_first_pool; pool; pool = pool->next_pool) { jbe@0: for (i=0; ilistener_count; i++) { jbe@0: struct moonbr_listener *listener = &pool->listener[i]; jbe@0: if (listener->listenfd != -1) { jbe@0: if (close(listener->listenfd) && errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Could not close listening socket: %s", strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: } jbe@291: for (worker=pool->first_worker; worker; worker=worker->next_worker) { jbe@291: if (moonbr_debug) { jbe@291: moonbr_log(LOG_DEBUG, "Sending SIGTERM to child with PID %i", (int)worker->pid); jbe@291: } jbe@291: if (kill(worker->pid, SIGTERM)) { jbe@291: moonbr_log(LOG_ERR, "Error while terminating child process: %s", strerror(errno)); jbe@291: } jbe@195: } jbe@195: } jbe@294: moonbr_poll_shutdown(); /* avoids loops due to error condition when polling closed listeners */ jbe@0: } jbe@0: jbe@0: jbe@128: /*** Functions to handle previously created 'struct moonbr_worker' structures ***/ jbe@128: jbe@128: #define moonbr_try_destroy_worker_stat(str, field) \ jbe@128: moonbr_log(LOG_INFO, "Resource usage in pool #%i for PID %i: " str " %li", worker->pool->poolnum, (int)worker->pid, (long)childusage.field); jbe@128: jbe@128: /* Destroys a worker structure if socket connections have been closed and child process has terminated */ jbe@128: static int moonbr_try_destroy_worker(struct moonbr_worker *worker) { jbe@128: if (worker->controlfd != -1 || worker->errorfd != -1) return MOONBR_DESTROY_NONE; jbe@128: { jbe@128: int childstatus; jbe@128: struct rusage childusage; jbe@128: { jbe@128: pid_t waitedpid; jbe@128: while ( jbe@128: (waitedpid = wait4(worker->pid, &childstatus, WNOHANG, &childusage)) == -1 jbe@128: ) { jbe@128: if (errno != EINTR) { jbe@128: moonbr_log(LOG_CRIT, "Error in wait4() call: %s", strerror(errno)); jbe@128: moonbr_terminate_error(); jbe@128: } jbe@128: } jbe@128: if (!waitedpid) return 0; /* return 0 if worker couldn't be destroyed */ jbe@128: if (waitedpid != worker->pid) { jbe@128: moonbr_log(LOG_CRIT, "Wrong PID returned by wait4() call"); jbe@128: moonbr_terminate_error(); jbe@128: } jbe@128: } jbe@128: if (WIFEXITED(childstatus)) { jbe@128: if (WEXITSTATUS(childstatus) || moonbr_stat) { jbe@128: moonbr_log( jbe@128: WEXITSTATUS(childstatus) ? LOG_WARNING : LOG_INFO, jbe@128: "Child process in pool #%i with PID %i returned with exit code %i", worker->pool->poolnum, (int)worker->pid, WEXITSTATUS(childstatus) jbe@128: ); jbe@128: } jbe@128: } else if (WIFSIGNALED(childstatus)) { jbe@128: if (WCOREDUMP(childstatus)) { jbe@128: moonbr_log(LOG_ERR, "Child process in pool #%i with PID %i died from signal %i (core dump was created)", worker->pool->poolnum, (int)worker->pid, WTERMSIG(childstatus)); jbe@128: } else if (WTERMSIG(childstatus) == SIGALRM) { jbe@128: moonbr_log(LOG_WARNING, "Child process in pool #%i with PID %i exited prematurely due to timeout", worker->pool->poolnum, (int)worker->pid); jbe@128: } else { jbe@128: moonbr_log(LOG_ERR, "Child process in pool #%i with PID %i died from signal %i", worker->pool->poolnum, (int)worker->pid, WTERMSIG(childstatus)); jbe@128: } jbe@128: } else { jbe@128: moonbr_log(LOG_CRIT, "Illegal exit status from child process in pool #%i with PID %i", worker->pool->poolnum, (int)worker->pid); jbe@128: moonbr_terminate_error(); jbe@128: } jbe@128: if (moonbr_stat) { jbe@128: moonbr_log(LOG_INFO, "Resource usage in pool #%i for PID %i: user time %s", worker->pool->poolnum, (int)worker->pid, moonbr_format_timeval(&childusage.ru_utime)); jbe@128: moonbr_log(LOG_INFO, "Resource usage in pool #%i for PID %i: system time %s", worker->pool->poolnum, (int)worker->pid, moonbr_format_timeval(&childusage.ru_stime)); jbe@128: moonbr_try_destroy_worker_stat("max resident set size", ru_maxrss); jbe@128: moonbr_try_destroy_worker_stat("integral shared memory size", ru_ixrss); jbe@128: moonbr_try_destroy_worker_stat("integral unshared data", ru_idrss); jbe@128: moonbr_try_destroy_worker_stat("integral unshared stack", ru_isrss); jbe@128: moonbr_try_destroy_worker_stat("page replaims", ru_minflt); jbe@128: moonbr_try_destroy_worker_stat("page faults", ru_majflt); jbe@128: moonbr_try_destroy_worker_stat("swaps", ru_nswap); jbe@128: moonbr_try_destroy_worker_stat("block input operations", ru_inblock); jbe@128: moonbr_try_destroy_worker_stat("block output operations", ru_oublock); jbe@128: moonbr_try_destroy_worker_stat("messages sent", ru_msgsnd); jbe@128: moonbr_try_destroy_worker_stat("messages received", ru_msgrcv); jbe@128: moonbr_try_destroy_worker_stat("signals received", ru_nsignals); jbe@128: moonbr_try_destroy_worker_stat("voluntary context switches", ru_nvcsw); jbe@128: moonbr_try_destroy_worker_stat("involuntary context switches", ru_nivcsw); jbe@128: } jbe@128: } jbe@128: { jbe@128: int retval = ( jbe@128: (worker->idle || worker->assigned) ? jbe@128: MOONBR_DESTROY_IDLE_OR_ASSIGNED : jbe@128: MOONBR_DESTROY_PREPARE jbe@128: ); jbe@302: if (worker->main && !moonbr_shutdown_in_progress) moonbr_initiate_shutdown(); jbe@128: if (worker->prev_worker) worker->prev_worker->next_worker = worker->next_worker; jbe@128: else worker->pool->first_worker = worker->next_worker; jbe@128: if (worker->next_worker) worker->next_worker->prev_worker = worker->prev_worker; jbe@128: else worker->pool->last_worker = worker->prev_worker; jbe@128: if (worker->idle) { jbe@128: if (worker->prev_idle_worker) worker->prev_idle_worker->next_idle_worker = worker->next_idle_worker; jbe@128: else worker->pool->first_idle_worker = worker->next_idle_worker; jbe@128: if (worker->next_idle_worker) worker->next_idle_worker->prev_idle_worker = worker->prev_idle_worker; jbe@128: else worker->pool->last_idle_worker = worker->prev_idle_worker; jbe@128: worker->pool->idle_worker_count--; jbe@128: } jbe@128: if (!worker->assigned) worker->pool->unassigned_worker_count--; jbe@128: worker->pool->total_worker_count--; jbe@128: worker->pool->worker_count_stat = 1; jbe@128: if (worker->errorlinebuf) free(worker->errorlinebuf); jbe@128: free(worker); jbe@128: return retval; jbe@128: } jbe@128: } jbe@128: jbe@128: /* Marks a worker as idle and stores it in a queue, optionally setting 'idle_expiration' value */ jbe@128: static void moonbr_add_idle_worker(struct moonbr_worker *worker) { jbe@128: worker->prev_idle_worker = worker->pool->last_idle_worker; jbe@128: if (worker->prev_idle_worker) worker->prev_idle_worker->next_idle_worker = worker; jbe@128: else worker->pool->first_idle_worker = worker; jbe@128: worker->pool->last_idle_worker = worker; jbe@128: worker->idle = 1; jbe@128: worker->pool->idle_worker_count++; jbe@128: if (worker->assigned) { jbe@128: worker->assigned = 0; jbe@128: worker->pool->unassigned_worker_count++; jbe@128: } jbe@128: worker->pool->worker_count_stat = 1; jbe@128: if (timerisset(&worker->pool->idle_timeout)) { jbe@128: struct timeval now; jbe@128: moonbr_now(&now); jbe@128: timeradd(&now, &worker->pool->idle_timeout, &worker->idle_expiration); jbe@128: } jbe@128: } jbe@128: jbe@128: /* Pops a worker from the queue of idle workers (idle queue must not be empty) */ jbe@128: static struct moonbr_worker *moonbr_pop_idle_worker(struct moonbr_pool *pool) { jbe@128: struct moonbr_worker *worker; jbe@128: worker = pool->first_idle_worker; jbe@128: pool->first_idle_worker = worker->next_idle_worker; jbe@128: if (pool->first_idle_worker) pool->first_idle_worker->prev_idle_worker = NULL; jbe@128: else pool->last_idle_worker = NULL; jbe@128: worker->next_idle_worker = NULL; jbe@128: worker->idle = 0; jbe@128: worker->pool->idle_worker_count--; jbe@128: worker->assigned = 1; jbe@128: worker->pool->unassigned_worker_count--; jbe@128: worker->pool->worker_count_stat = 1; jbe@128: return worker; jbe@128: } jbe@128: jbe@128: jbe@0: /*** Functions to communicate with child processes ***/ jbe@0: jbe@0: /* Tells child process to terminate */ jbe@0: static void moonbr_terminate_idle_worker(struct moonbr_worker *worker) { jbe@0: moonbr_send_control_message(worker, MOONBR_COMMAND_TERMINATE, -1, NULL); jbe@0: } jbe@0: jbe@0: /* Handles status messages from child process */ jbe@0: static void moonbr_read_controlchannel(struct moonbr_worker *worker) { jbe@0: char controlmsg; jbe@0: { jbe@0: ssize_t bytes_read; jbe@0: while ((bytes_read = read(worker->controlfd, &controlmsg, 1)) <= 0) { jbe@0: if (bytes_read == 0 || errno == ECONNRESET) { jbe@0: moonbr_log(LOG_WARNING, "Child process in pool #%i with PID %i unexpectedly closed control socket", worker->pool->poolnum, (int)worker->pid); jbe@0: if (close(worker->controlfd) && errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Error while closing control socket to child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: worker->controlfd = -1; jbe@0: moonbr_poll_refresh_needed = 1; jbe@0: return; jbe@0: } jbe@0: if (errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Unexpected error while reading control socket from child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: } jbe@0: if (worker->idle) { jbe@0: moonbr_log(LOG_CRIT, "Unexpected data from supposedly idle child process in pool #%i with PID %i", worker->pool->poolnum, (int)worker->pid); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: if (moonbr_debug) { jbe@0: moonbr_log(LOG_DEBUG, "Received control message from child in pool #%i with PID %i: \"%c\"", worker->pool->poolnum, (int)worker->pid, (int)controlmsg); jbe@0: } jbe@0: switch (controlmsg) { jbe@0: case MOONBR_STATUS_IDLE: jbe@0: if (moonbr_stat) { jbe@0: moonbr_log(LOG_INFO, "Child process in pool #%i with PID %i reports as idle", worker->pool->poolnum, (int)worker->pid); jbe@0: } jbe@0: moonbr_add_idle_worker(worker); jbe@0: break; jbe@0: case MOONBR_STATUS_GOODBYE: jbe@0: if (moonbr_stat) { jbe@0: moonbr_log(LOG_INFO, "Child process in pool #%i with PID %i announced termination", worker->pool->poolnum, (int)worker->pid); jbe@0: } jbe@0: if (close(worker->controlfd) && errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Error while closing control socket to child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: worker->controlfd = -1; jbe@0: moonbr_poll_refresh_needed = 1; jbe@0: break; jbe@0: default: jbe@0: moonbr_log(LOG_CRIT, "Received illegal data (\"%c\") while reading control socket from child process in pool #%i with PID %i", (int)controlmsg, worker->pool->poolnum, (int)worker->pid); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: jbe@0: /* Handles stderr stream from child process */ jbe@0: static void moonbr_read_errorchannel(struct moonbr_worker *worker) { jbe@0: char staticbuf[MOONBR_MAXERRORLINELEN+1]; jbe@0: char *buf = worker->errorlinebuf; jbe@0: if (!buf) buf = staticbuf; jbe@0: { jbe@0: ssize_t bytes_read; jbe@0: while ( jbe@0: (bytes_read = read( jbe@0: worker->errorfd, jbe@0: buf + worker->errorlinelen, jbe@0: MOONBR_MAXERRORLINELEN+1 - worker->errorlinelen jbe@0: )) <= 0 jbe@0: ) { jbe@0: if (bytes_read == 0 || errno == ECONNRESET) { jbe@0: if (moonbr_debug) { jbe@0: moonbr_log(LOG_DEBUG, "Child process in pool #%i with PID %i closed stderr socket", worker->pool->poolnum, (int)worker->pid); jbe@0: } jbe@0: if (close(worker->errorfd) && errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Error while closing stderr socket to child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: worker->errorfd = -1; jbe@0: moonbr_poll_refresh_needed = 1; jbe@0: break; jbe@0: } jbe@0: if (errno != EINTR) { jbe@0: moonbr_log(LOG_CRIT, "Unexpected error while reading stderr from child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: worker->errorlinelen += bytes_read; jbe@0: } jbe@0: { jbe@0: int i; jbe@0: for (i=0; ierrorlinelen; i++) { jbe@0: if (buf[i] == '\n') buf[i] = 0; jbe@0: if (!buf[i]) { jbe@0: if (worker->errorlineovf) { jbe@0: worker->errorlineovf = 0; jbe@0: } else { jbe@283: moonbr_log(LOG_WARNING, "[Pool #%i, PID %i] %s", worker->pool->poolnum, (int)worker->pid, buf); jbe@0: } jbe@0: worker->errorlinelen -= i+1; jbe@0: memmove(buf, buf+i+1, worker->errorlinelen); jbe@0: i = -1; jbe@0: } jbe@0: } jbe@0: if (i > MOONBR_MAXERRORLINELEN) { jbe@0: buf[MOONBR_MAXERRORLINELEN] = 0; jbe@0: if (!worker->errorlineovf) { jbe@283: moonbr_log(LOG_WARNING, "[Pool #%i, PID %i](line has been truncated) %s", worker->pool->poolnum, (int)worker->pid, buf); jbe@0: } jbe@0: worker->errorlinelen = 0; jbe@0: worker->errorlineovf = 1; jbe@0: } jbe@0: } jbe@0: if (!worker->errorlinebuf && worker->errorlinelen) { /* allocate buffer on heap only if necessary */ jbe@0: worker->errorlinebuf = malloc((MOONBR_MAXERRORLINELEN+1) * sizeof(char)); jbe@0: if (!worker->errorlinebuf) { jbe@0: moonbr_log(LOG_CRIT, "Memory allocation error"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: memcpy(worker->errorlinebuf, staticbuf, worker->errorlinelen); jbe@0: } jbe@0: } jbe@0: jbe@0: jbe@0: /*** Handler for incoming connections ***/ jbe@0: jbe@0: /* Accepts one or more incoming connections on listener socket and passes it to worker(s) popped from idle queue */ jbe@0: static void moonbr_connect(struct moonbr_pool *pool) { jbe@0: struct moonbr_listener *listener = moonbr_pop_connected_listener(pool); jbe@0: struct moonbr_worker *worker; jbe@274: if (listener->proto == MOONBR_PROTO_INTERVAL) { jbe@127: worker = moonbr_pop_idle_worker(pool); jbe@274: if (listener->type_specific.interval.main) { jbe@274: if (moonbr_stat) { jbe@274: moonbr_log(LOG_INFO, "Dispatching main thread \"%s\" of pool #%i to PID %i", listener->type_specific.interval.name, listener->pool->poolnum, (int)worker->pid); jbe@274: } jbe@274: worker->main = 1; jbe@274: } else { jbe@274: if (moonbr_stat) { jbe@274: moonbr_log(LOG_INFO, "Dispatching interval timer \"%s\" of pool #%i to PID %i", listener->type_specific.interval.name, listener->pool->poolnum, (int)worker->pid); jbe@274: } jbe@274: worker->restart_interval_listener = listener; jbe@127: } jbe@127: moonbr_send_control_message(worker, MOONBR_COMMAND_CONNECT, -1, listener); jbe@274: /* do not push listener to queue of idle listeners (yet) */ jbe@125: } else { jbe@125: int peerfd; jbe@0: do { jbe@214: #if defined(__linux__) && !defined(_GNU_SOURCE) jbe@212: peerfd = accept(listener->listenfd, NULL, NULL); jbe@212: if (peerfd != -1) { jbe@212: if (fcntl(peerfd, F_SETFD, FD_CLOEXEC) == -1) { jbe@212: moonbr_log(LOG_ERR, "Error in fcntl() call: %s", strerror(errno)); jbe@212: moonbr_terminate_error(); jbe@212: } jbe@212: } jbe@214: #else jbe@214: peerfd = accept4(listener->listenfd, NULL, NULL, SOCK_CLOEXEC); jbe@214: #endif jbe@0: if (peerfd == -1) { jbe@0: if (errno == EWOULDBLOCK) { jbe@0: break; jbe@0: } else if (errno == ECONNABORTED) { jbe@125: moonbr_log(LOG_WARNING, "Connection aborted before accepting it"); jbe@0: break; jbe@0: } else if (errno != EINTR) { jbe@0: moonbr_log(LOG_ERR, "Could not accept socket connection: %s", strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } else { jbe@0: worker = moonbr_pop_idle_worker(pool); jbe@0: if (moonbr_stat) { jbe@125: moonbr_log(LOG_INFO, "Dispatching connection for pool #%i to PID %i", listener->pool->poolnum, (int)worker->pid); jbe@0: } jbe@124: moonbr_send_control_message(worker, MOONBR_COMMAND_CONNECT, peerfd, listener); jbe@0: if (close(peerfd) && errno != EINTR) { jbe@0: moonbr_log(LOG_ERR, "Could not close incoming socket connection in parent process: %s", strerror(errno)); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: } while (pool->first_idle_worker); jbe@0: moonbr_add_idle_listener(listener); jbe@0: } jbe@0: } jbe@0: jbe@0: jbe@0: /*** Functions to initialize and restart interval timers ***/ jbe@0: jbe@0: /* Initializes all interval timers */ jbe@0: static void moonbr_interval_initialize() { jbe@0: struct timeval now; jbe@0: struct moonbr_pool *pool; jbe@0: moonbr_now(&now); jbe@0: for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { jbe@0: int i; jbe@0: for (i=0; ilistener_count; i++) { jbe@0: struct moonbr_listener *listener = &pool->listener[i]; jbe@0: if (listener->proto == MOONBR_PROTO_INTERVAL) { jbe@0: timeradd( jbe@0: &now, jbe@125: &listener->type_specific.interval.delay, jbe@125: &listener->type_specific.interval.wakeup jbe@0: ); jbe@0: } jbe@0: } jbe@0: } jbe@0: } jbe@0: jbe@0: /* If necessary, restarts interval timers and queues interval listener as idle after a worker changed status */ jbe@0: static void moonbr_interval_restart( jbe@0: struct moonbr_worker *worker, jbe@0: struct timeval *now /* passed to synchronize with moonbr_run() function */ jbe@0: ) { jbe@0: struct moonbr_listener *listener = worker->restart_interval_listener; jbe@0: if (listener) { jbe@0: moonbr_add_idle_listener(listener); jbe@0: worker->restart_interval_listener = NULL; jbe@125: if (listener->type_specific.interval.strict) { jbe@0: timeradd( jbe@125: &listener->type_specific.interval.wakeup, jbe@125: &listener->type_specific.interval.delay, jbe@125: &listener->type_specific.interval.wakeup jbe@0: ); jbe@125: if (timercmp(&listener->type_specific.interval.wakeup, now, <)) { jbe@125: listener->type_specific.interval.wakeup = *now; jbe@0: } jbe@0: } else { jbe@0: timeradd( jbe@0: now, jbe@125: &listener->type_specific.interval.delay, jbe@125: &listener->type_specific.interval.wakeup jbe@0: ); jbe@0: } jbe@0: } jbe@0: } jbe@0: jbe@0: jbe@0: /*** Main loop and helper functions ***/ jbe@0: jbe@0: /* Stores the earliest required wakeup time in 'wait' variable */ jbe@0: static void moonbr_calc_wait(struct timeval *wait, struct timeval *wakeup) { jbe@0: if (!timerisset(wait) || timercmp(wakeup, wait, <)) *wait = *wakeup; jbe@0: } jbe@0: jbe@0: /* Main loop of Moonbridge system (including initialization of signal handlers and polling structures) */ jbe@0: static void moonbr_run(lua_State *L) { jbe@0: struct timeval now; jbe@0: struct moonbr_pool *pool; jbe@0: struct moonbr_worker *worker; jbe@0: struct moonbr_worker *next_worker; /* needed when worker is removed during iteration of workers */ jbe@0: struct moonbr_listener *listener; jbe@0: struct moonbr_listener *next_listener; /* needed when listener is removed during iteration of listeners */ jbe@0: int i; jbe@0: moonbr_poll_init(); /* must be executed before moonbr_signal_init() */ jbe@0: moonbr_signal_init(); jbe@0: moonbr_interval_initialize(); jbe@0: moonbr_pstate = MOONBR_PSTATE_RUNNING; jbe@0: while (1) { jbe@0: struct timeval wait = {0, }; /* point in time when premature wakeup of poll() is required */ jbe@0: if (moonbr_cond_interrupt) { jbe@0: moonbr_log(LOG_WARNING, "Fast shutdown requested"); jbe@0: moonbr_terminate(MOONBR_EXITCODE_GRACEFUL); jbe@0: } jbe@276: if (moonbr_cond_terminate) { jbe@276: moonbr_initiate_shutdown(); jbe@276: moonbr_cond_terminate = 0; jbe@276: } jbe@0: moonbr_cond_child = 0; /* must not be reset between moonbr_try_destroy_worker() and poll() */ jbe@0: moonbr_now(&now); jbe@0: for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { jbe@0: int terminated_worker_count = 0; /* allows shortcut for new worker creation */ jbe@0: /* terminate idle workers when expired */ jbe@0: if (timerisset(&pool->idle_timeout)) { jbe@0: while ((worker = pool->first_idle_worker) != NULL) { jbe@0: if (timercmp(&worker->idle_expiration, &now, >)) break; jbe@0: moonbr_pop_idle_worker(pool); jbe@0: moonbr_terminate_idle_worker(worker); jbe@0: } jbe@0: } jbe@0: /* mark listeners as connected when incoming connection is pending */ jbe@0: for (listener=pool->first_idle_listener; listener; listener=next_listener) { jbe@0: next_listener = listener->next_listener; /* extra variable necessary due to changing list */ jbe@0: if (listener->pollidx != -1) { jbe@0: if (moonbr_poll_fds[listener->pollidx].revents) { jbe@0: moonbr_poll_fds[listener->pollidx].revents = 0; jbe@0: moonbr_remove_idle_listener(listener); jbe@0: moonbr_add_connected_listener(listener); jbe@0: } jbe@127: } else if ( jbe@127: listener->proto != MOONBR_PROTO_INTERVAL || jbe@127: !timercmp(&listener->type_specific.interval.wakeup, &now, >) jbe@127: ) { jbe@127: moonbr_remove_idle_listener(listener); jbe@127: moonbr_add_connected_listener(listener); jbe@0: } jbe@0: } jbe@0: /* process input from child processes */ jbe@0: for (i=0; ichannel) { jbe@0: case MOONBR_POLL_WORKER_CONTROLCHANNEL: jbe@0: moonbr_read_controlchannel(poll_worker->worker); jbe@0: moonbr_interval_restart(poll_worker->worker, &now); jbe@0: break; jbe@0: case MOONBR_POLL_WORKER_ERRORCHANNEL: jbe@0: moonbr_read_errorchannel(poll_worker->worker); jbe@0: break; jbe@0: } jbe@0: } jbe@0: } jbe@0: /* collect dead child processes */ jbe@0: for (worker=pool->first_worker; worker; worker=next_worker) { jbe@0: next_worker = worker->next_worker; /* extra variable necessary due to changing list */ jbe@0: switch (moonbr_try_destroy_worker(worker)) { jbe@0: case MOONBR_DESTROY_PREPARE: jbe@0: pool->use_fork_error_wakeup = 1; jbe@0: break; jbe@0: case MOONBR_DESTROY_IDLE_OR_ASSIGNED: jbe@0: terminated_worker_count++; jbe@0: break; jbe@0: } jbe@0: } jbe@0: if (!moonbr_shutdown_in_progress) { jbe@130: /* connect listeners with idle workers */ jbe@0: while (pool->first_connected_listener && pool->first_idle_worker) { jbe@0: moonbr_connect(pool); jbe@0: } jbe@130: /* create new worker processes */ jbe@130: while ( jbe@130: pool->total_worker_count < pool->max_fork && ( jbe@130: pool->unassigned_worker_count < pool->pre_fork || jbe@130: pool->total_worker_count < pool->min_fork jbe@130: ) jbe@130: ) { jbe@130: if (pool->use_fork_error_wakeup) { jbe@130: if (timercmp(&pool->fork_error_wakeup, &now, >)) { jbe@130: moonbr_calc_wait(&wait, &pool->fork_error_wakeup); jbe@130: break; jbe@130: } jbe@130: } else { jbe@130: if (terminated_worker_count) { jbe@130: terminated_worker_count--; jbe@130: } else if (timercmp(&pool->fork_wakeup, &now, >)) { jbe@130: moonbr_calc_wait(&wait, &pool->fork_wakeup); jbe@130: break; jbe@130: } jbe@0: } jbe@130: if (moonbr_create_worker(pool, L)) { jbe@130: /* on error, enforce error delay */ jbe@130: timeradd(&now, &pool->fork_error_delay, &pool->fork_error_wakeup); jbe@130: pool->use_fork_error_wakeup = 1; jbe@130: moonbr_calc_wait(&wait, &pool->fork_error_wakeup); jbe@130: break; jbe@130: } else { jbe@130: /* normal fork delay on success */ jbe@130: timeradd(&now, &pool->fork_delay, &pool->fork_wakeup); jbe@130: timeradd(&now, &pool->fork_error_delay, &pool->fork_error_wakeup); jbe@130: pool->use_fork_error_wakeup = 0; /* gets set later if error occures during preparation */ jbe@130: } jbe@130: } jbe@130: /* terminate excessive worker processes */ jbe@130: while ( jbe@130: pool->total_worker_count > pool->min_fork && jbe@130: pool->idle_worker_count > pool->pre_fork jbe@130: ) { jbe@130: if (timerisset(&pool->exit_wakeup)) { jbe@130: if (timercmp(&pool->exit_wakeup, &now, >)) { jbe@130: moonbr_calc_wait(&wait, &pool->exit_wakeup); jbe@130: break; jbe@130: } jbe@130: moonbr_terminate_idle_worker(moonbr_pop_idle_worker(pool)); jbe@130: timeradd(&now, &pool->exit_delay, &pool->exit_wakeup); jbe@130: } else { jbe@130: timeradd(&now, &pool->exit_delay, &pool->exit_wakeup); jbe@0: break; jbe@0: } jbe@0: } jbe@130: if (!( jbe@130: pool->total_worker_count > pool->min_fork && jbe@130: pool->idle_worker_count > pool->pre_fork jbe@130: )) { jbe@130: timerclear(&pool->exit_wakeup); /* timer gets restarted later when there are excessive workers */ jbe@0: } jbe@0: } jbe@0: /* optionally output worker count stats */ jbe@0: if (moonbr_stat && pool->worker_count_stat) { jbe@0: pool->worker_count_stat = 0; jbe@0: moonbr_log( jbe@0: LOG_INFO, jbe@0: "Worker count for pool #%i: %i idle, %i assigned, %i total", jbe@0: pool->poolnum, pool->idle_worker_count, jbe@0: pool->total_worker_count - pool->unassigned_worker_count, jbe@0: pool->total_worker_count); jbe@0: } jbe@0: /* calculate wakeup time for interval listeners */ jbe@0: for (listener=pool->first_idle_listener; listener; listener=listener->next_listener) { jbe@0: if (listener->proto == MOONBR_PROTO_INTERVAL) { jbe@125: moonbr_calc_wait(&wait, &listener->type_specific.interval.wakeup); jbe@0: } jbe@0: } jbe@0: /* calculate wakeup time for idle workers (only first idle worker is significant) */ jbe@0: if (timerisset(&pool->idle_timeout) && pool->first_idle_worker) { jbe@0: moonbr_calc_wait(&wait, &pool->first_idle_worker->idle_expiration); jbe@0: } jbe@0: } jbe@130: /* terminate idle workers in case of shutdown and check if shutdown is complete */ jbe@0: if (moonbr_shutdown_in_progress) { jbe@195: int remaining = 0; jbe@0: for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { jbe@195: while (pool->idle_worker_count) { jbe@195: moonbr_terminate_idle_worker(moonbr_pop_idle_worker(pool)); jbe@130: } jbe@195: if (pool->first_worker) remaining = 1; jbe@0: } jbe@195: if (!remaining) { jbe@0: moonbr_log(LOG_INFO, "All worker threads have terminated"); jbe@0: moonbr_terminate(MOONBR_EXITCODE_GRACEFUL); jbe@0: } jbe@0: } jbe@0: if (moonbr_poll_refresh_needed) moonbr_poll_refresh(); jbe@0: moonbr_cond_poll = 1; jbe@0: if (!moonbr_cond_child && !moonbr_cond_terminate && !moonbr_cond_interrupt) { jbe@0: int timeout; jbe@0: if (timerisset(&wait)) { jbe@0: if (timercmp(&wait, &now, <)) { jbe@0: moonbr_log(LOG_CRIT, "Internal error (should not happen): Future is in the past"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: timersub(&wait, &now, &wait); jbe@0: timeout = wait.tv_sec * 1000 + wait.tv_usec / 1000; jbe@0: } else { jbe@0: timeout = INFTIM; jbe@0: } jbe@0: if (moonbr_debug) { jbe@0: moonbr_log(LOG_DEBUG, "Waiting for I/O"); jbe@0: } jbe@0: poll(moonbr_poll_fds, moonbr_poll_fds_count, timeout); jbe@0: } else { jbe@0: if (moonbr_debug) { jbe@0: moonbr_log(LOG_DEBUG, "Do not wait for I/O"); jbe@0: } jbe@0: } jbe@0: moonbr_cond_poll = 0; jbe@0: moonbr_poll_reset_signal(); jbe@0: } jbe@0: } jbe@0: jbe@0: jbe@0: /*** Lua interface ***/ jbe@0: jbe@0: static int moonbr_lua_panic(lua_State *L) { jbe@0: const char *errmsg; jbe@0: errmsg = lua_tostring(L, -1); jbe@0: if (!errmsg) { jbe@0: if (lua_isnoneornil(L, -1)) errmsg = "(error message is nil)"; jbe@0: else errmsg = "(error message is not a string)"; jbe@0: } jbe@0: if (moonbr_pstate == MOONBR_PSTATE_FORKED) { jbe@0: fprintf(stderr, "Uncaught Lua error: %s\n", errmsg); jbe@0: exit(1); jbe@0: } else { jbe@0: moonbr_log(LOG_CRIT, "Uncaught Lua error: %s", errmsg); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: return 0; jbe@0: } jbe@0: jbe@0: static int moonbr_addtraceback(lua_State *L) { jbe@0: luaL_traceback(L, L, luaL_tolstring(L, 1, NULL), 1); jbe@0: return 1; jbe@0: } jbe@0: jbe@0: /* Memory allocator that allows limiting memory consumption */ jbe@0: static void *moonbr_alloc (void *ud, void *ptr, size_t osize, size_t nsize) { jbe@0: (void)ud; /* not used */ jbe@0: if (nsize == 0) { jbe@0: if (ptr) { jbe@0: moonbr_memory_usage -= osize; jbe@0: free(ptr); jbe@0: } jbe@0: return NULL; jbe@0: } else if (ptr) { jbe@0: if ( jbe@0: moonbr_memory_limit && jbe@0: nsize > osize && jbe@0: moonbr_memory_usage + (nsize - osize) > moonbr_memory_limit jbe@0: ) { jbe@0: return NULL; jbe@0: } else { jbe@0: ptr = realloc(ptr, nsize); jbe@0: if (ptr) moonbr_memory_usage += nsize - osize; jbe@0: } jbe@0: } else { jbe@0: if ( jbe@0: moonbr_memory_limit && jbe@0: moonbr_memory_usage + nsize > moonbr_memory_limit jbe@0: ) { jbe@0: return NULL; jbe@0: } else { jbe@0: ptr = realloc(ptr, nsize); jbe@0: if (ptr) moonbr_memory_usage += nsize; jbe@0: } jbe@0: } jbe@0: return ptr; jbe@0: } jbe@0: jbe@66: static int moonbr_lua_tonatural(lua_State *L, int idx) { jbe@66: int isnum; jbe@66: lua_Number n; jbe@66: n = lua_tonumberx(L, idx, &isnum); jbe@66: if (isnum && n>=0 && n=0 && n<=100000000) { jbe@66: value->tv_sec = n; jbe@66: value->tv_usec = 1e6 * (n - value->tv_sec); jbe@66: return 1; jbe@66: } else { jbe@66: return 0; jbe@66: } jbe@66: } jbe@66: jbe@0: static int moonbr_timeout(lua_State *L) { jbe@0: struct itimerval oldval; jbe@0: if (lua_isnoneornil(L, 1) && lua_isnoneornil(L, 2)) { jbe@0: getitimer(ITIMER_REAL, &oldval); jbe@0: } else { jbe@0: struct itimerval newval = {}; jbe@39: timerclear(&newval.it_interval); jbe@39: timerclear(&newval.it_value); jbe@0: if (lua_toboolean(L, 1)) { jbe@0: luaL_argcheck( jbe@0: L, moonbr_lua_totimeval(L, 1, &newval.it_value), 1, jbe@0: "interval in seconds expected" jbe@0: ); jbe@0: } jbe@0: if (lua_isnoneornil(L, 2)) { jbe@0: if (setitimer(ITIMER_REAL, &newval, &oldval)) { jbe@0: moonbr_log(LOG_CRIT, "Could not set ITIMER_REAL via setitimer()"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } else { jbe@0: getitimer(ITIMER_REAL, &oldval); jbe@39: if (!timerisset(&oldval.it_value)) { jbe@39: if (setitimer(ITIMER_REAL, &newval, NULL)) { jbe@39: moonbr_log(LOG_CRIT, "Could not set ITIMER_REAL via setitimer()"); jbe@39: moonbr_terminate_error(); jbe@39: } jbe@39: lua_call(L, lua_gettop(L) - 2, LUA_MULTRET); jbe@39: timerclear(&newval.it_value); jbe@39: if (setitimer(ITIMER_REAL, &newval, NULL)) { jbe@39: moonbr_log(LOG_CRIT, "Could not set ITIMER_REAL via setitimer()"); jbe@39: moonbr_terminate_error(); jbe@39: } jbe@39: } else if (timercmp(&newval.it_value, &oldval.it_value, <)) { jbe@0: struct itimerval remval; jbe@0: if (setitimer(ITIMER_REAL, &newval, NULL)) { jbe@0: moonbr_log(LOG_CRIT, "Could not set ITIMER_REAL via setitimer()"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: lua_call(L, lua_gettop(L) - 2, LUA_MULTRET); jbe@0: getitimer(ITIMER_REAL, &remval); jbe@0: timersub(&oldval.it_value, &newval.it_value, &newval.it_value); jbe@0: timeradd(&newval.it_value, &remval.it_value, &newval.it_value); jbe@0: if (setitimer(ITIMER_REAL, &newval, NULL)) { jbe@0: moonbr_log(LOG_CRIT, "Could not set ITIMER_REAL via setitimer()"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } else { jbe@0: lua_call(L, lua_gettop(L) - 2, LUA_MULTRET); jbe@0: } jbe@0: return lua_gettop(L) - 1; jbe@0: } jbe@0: } jbe@115: lua_pushnumber(L, oldval.it_value.tv_sec + 1e-6 * oldval.it_value.tv_usec); jbe@0: return 1; jbe@0: } jbe@0: jbe@264: #define moonbr_listen_init_pool_forkoption(luaname, cname, defval) do { \ jbe@0: lua_getfield(L, 2, luaname); \ jbe@0: pool->cname = lua_isnil(L, -1) ? (defval) : moonbr_lua_tonatural(L, -1); \ jbe@0: } while(0) jbe@0: jbe@0: #define moonbr_listen_init_pool_timeoption(luaname, cname, defval, defvalu) ( \ jbe@0: lua_getfield(L, 2, luaname), \ jbe@0: lua_isnil(L, -1) ? ( \ jbe@0: pool->cname.tv_sec = (defval), pool->cname.tv_usec = (defvalu), \ jbe@0: 1 \ jbe@0: ) : ( \ jbe@0: (lua_isboolean(L, -1) && !lua_toboolean(L, -1)) ? ( \ jbe@0: pool->cname.tv_sec = 0, pool->cname.tv_usec = 0, \ jbe@0: 1 \ jbe@0: ) : ( \ jbe@0: moonbr_lua_totimeval(L, -1, &pool->cname) \ jbe@0: ) \ jbe@0: ) \ jbe@0: ) jbe@0: jbe@0: static int moonbr_listen_init_pool(lua_State *L) { jbe@0: struct moonbr_pool *pool; jbe@0: const char *proto; jbe@274: int is_main; jbe@0: int i; jbe@127: int dynamic = 0; /* nonzero = listeners exist which require dynamic worker creation */ jbe@0: pool = lua_touserdata(L, 1); jbe@0: for (i=0; ilistener_count; i++) { jbe@0: struct moonbr_listener *listener = &pool->listener[i]; jbe@0: lua_settop(L, 2); jbe@3: #if LUA_VERSION_NUM >= 503 jbe@3: lua_geti(L, 2, i+1); jbe@3: #else jbe@0: lua_pushinteger(L, i+1); jbe@0: lua_gettable(L, 2); jbe@3: #endif jbe@0: lua_getfield(L, 3, "proto"); jbe@0: proto = lua_tostring(L, -1); jbe@274: is_main = !strcmp(proto, "main"); jbe@274: if (proto && (is_main || !strcmp(proto, "interval"))) { jbe@0: listener->proto = MOONBR_PROTO_INTERVAL; jbe@0: lua_getfield(L, 3, "name"); jbe@0: { jbe@0: const char *name = lua_tostring(L, -1); jbe@0: if (name) { jbe@211: char *name_dup = strdup(name); jbe@211: if (!name_dup) { jbe@0: moonbr_log(LOG_CRIT, "Memory allocation_error"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@211: listener->type_specific.interval.name = name_dup; jbe@0: } jbe@0: } jbe@274: if (is_main) { jbe@274: listener->type_specific.interval.main = 1; jbe@274: } else { jbe@274: listener->type_specific.interval.main = 0; jbe@274: dynamic = 1; jbe@274: lua_getfield(L, 3, "delay"); jbe@274: if ( jbe@274: !moonbr_lua_totimeval(L, -1, &listener->type_specific.interval.delay) || jbe@274: !timerisset(&listener->type_specific.interval.delay) jbe@274: ) { jbe@274: luaL_error(L, "No valid interval delay specified; use listen{{proto=\"interval\", delay=...}, ...}"); jbe@274: } jbe@274: lua_getfield(L, 3, "strict"); jbe@274: if (!lua_isnil(L, -1)) { jbe@274: if (lua_isboolean(L, -1)) { jbe@274: if (lua_toboolean(L, -1)) listener->type_specific.interval.strict = 1; jbe@274: } else { jbe@274: luaL_error(L, "Option \"strict\" must be a boolean if set; use listen{{proto=\"interval\", strict=true, ...}, ...}"); jbe@274: } jbe@0: } jbe@0: } jbe@0: } else if (proto && !strcmp(proto, "local")) { jbe@125: const char *path; jbe@125: const int path_maxlen = ( jbe@125: sizeof(listener->type_specific.socket.addr.addr_un) - jbe@125: ((void *)listener->type_specific.socket.addr.addr_un.sun_path - (void *)&listener->type_specific.socket.addr.addr_un) jbe@125: ) - 1; /* one byte for termination */ jbe@127: dynamic = 1; jbe@0: listener->proto = MOONBR_PROTO_LOCAL; jbe@0: lua_getfield(L, 3, "path"); jbe@125: path = lua_tostring(L, -1); jbe@125: if (!path) { jbe@125: luaL_error(L, "No valid path specified for local socket; use listen{{proto=\"local\", path=...}, ...}"); jbe@125: } jbe@125: if (strlen(path) > path_maxlen) { jbe@125: luaL_error(L, "Path name for local socket exceeded maximum length of %i characters", path_maxlen); jbe@125: } jbe@125: strcpy(listener->type_specific.socket.addr.addr_un.sun_path, path); jbe@125: } else if (proto && !strcmp(proto, "tcp")) { jbe@125: const char *host, *port; jbe@125: struct addrinfo hints = { 0, }; jbe@125: struct addrinfo *res, *addrinfo; jbe@125: int errcode; jbe@125: const char *ip; jbe@127: dynamic = 1; jbe@125: lua_getfield(L, 3, "host"); jbe@125: host = lua_isnil(L, -1) ? "::" : lua_tostring(L, -1); jbe@125: if (!host) { jbe@125: luaL_error(L, "No host specified; use listen{{proto=\"tcp\", host=...}, ...}"); jbe@125: } jbe@125: lua_getfield(L, 3, "port"); jbe@125: port = lua_tostring(L, -1); jbe@125: if (!port) { jbe@125: luaL_error(L, "No port specified; use listen{{proto=\"tcp\", host=...}, ...}"); jbe@125: } jbe@125: hints.ai_family = AF_UNSPEC; jbe@125: hints.ai_socktype = SOCK_STREAM; jbe@125: hints.ai_protocol = IPPROTO_TCP; jbe@125: hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE; jbe@125: errcode = getaddrinfo(host, port, &hints, &res); jbe@125: if (errcode) { jbe@125: freeaddrinfo(res); jbe@125: if (errcode == EAI_SYSTEM) { jbe@213: char errmsg[MOONBR_MAXSTRERRORLEN] = MOONBR_STRERROR_R_MSG; jbe@125: strerror_r(errno, errmsg, MOONBR_MAXSTRERRORLEN); /* use thread-safe call in case child created threads */ jbe@125: luaL_error(L, "Could not resolve host: %s: %s", gai_strerror(errcode), errmsg); jbe@125: } else { jbe@125: luaL_error(L, "Could not resolve host: %s", gai_strerror(errcode)); jbe@0: } jbe@0: } jbe@125: for (addrinfo=res; addrinfo; addrinfo=addrinfo->ai_next) { jbe@125: if (addrinfo->ai_family == AF_INET6) goto moonbr_listen_init_pool_found; jbe@125: } jbe@125: for (addrinfo=res; addrinfo; addrinfo=addrinfo->ai_next) { jbe@125: if (addrinfo->ai_family == AF_INET) goto moonbr_listen_init_pool_found; jbe@0: } jbe@125: addrinfo = res; jbe@125: moonbr_listen_init_pool_found: jbe@125: if (addrinfo->ai_addrlen > sizeof(listener->type_specific.socket.addr)) { jbe@125: moonbr_log(LOG_CRIT, "Size of ai_addrlen is unexpectedly big (should not happen)"); jbe@125: moonbr_terminate_error(); jbe@0: } jbe@125: memcpy(&listener->type_specific.socket.addr, addrinfo->ai_addr, addrinfo->ai_addrlen); jbe@125: listener->type_specific.socket.addrlen = addrinfo->ai_addrlen; jbe@125: switch (addrinfo->ai_family) { jbe@125: case AF_INET6: jbe@125: ip = inet_ntop( jbe@125: addrinfo->ai_family, jbe@125: &((struct sockaddr_in6 *)addrinfo->ai_addr)->sin6_addr, jbe@125: listener->proto_specific.tcp.ip, jbe@125: INET6_ADDRSTRLEN jbe@125: ); jbe@125: if (!ip) { jbe@125: moonbr_log(LOG_CRIT, "System error in inet_ntop call: %s", strerror(errno)); jbe@125: moonbr_terminate_error(); jbe@125: } jbe@125: listener->proto_specific.tcp.port = ntohs(((struct sockaddr_in6 *)addrinfo->ai_addr)->sin6_port); jbe@125: break; jbe@125: case AF_INET: jbe@125: ip = inet_ntop( jbe@125: addrinfo->ai_family, jbe@125: &((struct sockaddr_in *)addrinfo->ai_addr)->sin_addr, jbe@125: listener->proto_specific.tcp.ip, jbe@125: INET6_ADDRSTRLEN jbe@125: ); jbe@125: if (!ip) { jbe@125: moonbr_log(LOG_CRIT, "System error in inet_ntop call: %s", strerror(errno)); jbe@125: moonbr_terminate_error(); jbe@125: } jbe@125: listener->proto_specific.tcp.port = ntohs(((struct sockaddr_in *)addrinfo->ai_addr)->sin_port); jbe@125: break; jbe@125: default: jbe@125: strcpy(listener->proto_specific.tcp.ip, "unknown"); jbe@125: listener->proto_specific.tcp.port = 0; jbe@0: } jbe@125: listener->proto = MOONBR_PROTO_TCP; jbe@125: } else if (proto) { jbe@125: luaL_error(L, "Unknown protocol \"%s\"", proto); jbe@125: } else { jbe@125: luaL_error(L, "No valid protocol specified; use listen{{proto=..., ...}, ...}"); jbe@0: } jbe@0: } jbe@0: lua_settop(L, 2); jbe@127: if (dynamic) { jbe@127: moonbr_listen_init_pool_forkoption("pre_fork", pre_fork, 1); jbe@127: moonbr_listen_init_pool_forkoption("min_fork", min_fork, pool->pre_fork > 2 ? pool->pre_fork : 2); jbe@127: moonbr_listen_init_pool_forkoption("max_fork", max_fork, pool->min_fork > 16 ? pool->min_fork : 16); jbe@127: if (!moonbr_listen_init_pool_timeoption("fork_delay", fork_delay, 0, 250000)) { jbe@127: luaL_error(L, "Option \"fork_delay\" is expected to be a non-negative number"); jbe@127: } jbe@127: if (!moonbr_listen_init_pool_timeoption("fork_error_delay", fork_error_delay, 2, 0)) { jbe@127: luaL_error(L, "Option \"fork_error_delay\" is expected to be a non-negative number"); jbe@127: } jbe@127: if (!moonbr_listen_init_pool_timeoption("exit_delay", exit_delay, 60, 0)) { jbe@127: luaL_error(L, "Option \"exit_delay\" is expected to be a non-negative number"); jbe@127: } jbe@127: if (timercmp(&pool->fork_error_delay, &pool->fork_delay, <)) { jbe@127: pool->fork_error_delay = pool->fork_delay; jbe@127: } jbe@127: if (!moonbr_listen_init_pool_timeoption("idle_timeout", idle_timeout, 0, 0)) { jbe@127: luaL_error(L, "Option \"idle_timeout\" is expected to be a non-negative number"); jbe@127: } jbe@127: } else { jbe@127: pool->pre_fork = 0; jbe@127: pool->min_fork = pool->listener_count; jbe@127: pool->max_fork = pool->listener_count; jbe@0: } jbe@0: lua_getfield(L, 2, "memory_limit"); jbe@0: if (!lua_isnil(L, -1)) { jbe@0: int isnum; jbe@0: lua_Number n; jbe@0: n = lua_tonumberx(L, -1, &isnum); jbe@0: if (n < 0 || !isnum) { jbe@0: luaL_error(L, "Option \"memory_limit\" is expected to be a non-negative number"); jbe@0: } jbe@0: pool->memory_limit = n; jbe@0: } jbe@0: lua_settop(L, 2); jbe@0: lua_getfield(L, 2, "prepare"); jbe@0: if (!lua_isnil(L, -1) && !lua_isfunction(L, -1)) { jbe@0: luaL_error(L, "Option \"prepare\" must be nil or a function"); jbe@0: } jbe@0: lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_prepare_func(pool)); jbe@0: lua_getfield(L, 2, "connect"); jbe@0: if (!lua_isfunction(L, -1)) { jbe@0: luaL_error(L, "Option \"connect\" must be a function; use listen{{...}, {...}, connect=function(socket) ... end, ...}"); jbe@0: } jbe@0: lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_connect_func(pool)); jbe@0: lua_getfield(L, 2, "finish"); jbe@0: if (!lua_isnil(L, -1) && !lua_isfunction(L, -1)) { jbe@0: luaL_error(L, "Option \"finish\" must be nil or a function"); jbe@0: } jbe@0: lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_finish_func(pool)); jbe@0: return 0; jbe@0: } jbe@0: jbe@0: static int moonbr_listen(lua_State *L) { jbe@0: struct moonbr_pool *pool; jbe@0: lua_Integer listener_count; jbe@0: if (moonbr_booted) luaL_error(L, "Moonbridge bootup is already complete"); jbe@0: luaL_checktype(L, 1, LUA_TTABLE); jbe@0: listener_count = luaL_len(L, 1); jbe@0: if (!listener_count) luaL_error(L, "No listen ports specified; use listen{{proto=..., port=...},...}"); jbe@0: if (listener_count > 100) luaL_error(L, "Too many listeners"); jbe@0: pool = moonbr_create_pool(listener_count); jbe@0: lua_pushcfunction(L, moonbr_listen_init_pool); jbe@0: lua_pushlightuserdata(L, pool); jbe@0: lua_pushvalue(L, 1); jbe@0: if (lua_pcall(L, 2, 0, 0)) goto moonbr_listen_error; jbe@0: { jbe@0: int i; jbe@0: i = moonbr_start_pool(pool); jbe@0: if (i >= 0) { jbe@125: lua_pushfstring(L, "Could not initialize listener #%d: %s", i+1, strerror(errno)); jbe@125: moonbr_listen_error: jbe@125: moonbr_destroy_pool(pool); jbe@125: lua_pushnil(L); jbe@125: lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_prepare_func(pool)); jbe@125: lua_pushnil(L); jbe@125: lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_connect_func(pool)); jbe@125: lua_pushnil(L); jbe@125: lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_finish_func(pool)); jbe@125: lua_error(L); jbe@0: } jbe@0: } jbe@0: return 0; jbe@0: } jbe@0: jbe@0: jbe@9: /*** Function to modify Lua's library path and/or cpath ***/ jbe@9: jbe@9: #if defined(MOONBR_LUA_PATH) || defined(MOONBR_LUA_CPATH) jbe@9: static void moonbr_modify_path(lua_State *L, char *key, char *value) { jbe@9: int stackbase; jbe@9: stackbase = lua_gettop(L); jbe@9: lua_getglobal(L, "package"); jbe@9: lua_getfield(L, stackbase+1, key); jbe@9: { jbe@9: const char *current_str; jbe@9: size_t current_strlen; jbe@9: luaL_Buffer buf; jbe@9: current_str = lua_tolstring(L, stackbase+2, ¤t_strlen); jbe@9: luaL_buffinit(L, &buf); jbe@9: if (current_str) { jbe@9: lua_pushvalue(L, stackbase+2); jbe@9: luaL_addvalue(&buf); jbe@9: if (current_strlen && current_str[current_strlen-1] != ';') { jbe@9: luaL_addchar(&buf, ';'); jbe@9: } jbe@9: } jbe@9: luaL_addstring(&buf, value); jbe@9: luaL_pushresult(&buf); jbe@9: } jbe@9: lua_setfield(L, stackbase+1, key); jbe@9: lua_settop(L, stackbase); jbe@9: } jbe@9: #endif jbe@9: jbe@9: jbe@0: /*** Main function and command line invokation ***/ jbe@0: jbe@0: static void moonbr_usage(int err, const char *cmd) { jbe@0: FILE *out; jbe@0: out = err ? stderr : stdout; jbe@0: if (!cmd) cmd = "moonbridge"; jbe@0: fprintf(out, "Get this help message: %s {-h|--help}\n", cmd); jbe@0: fprintf(out, "Usage: %s \\\n", cmd); jbe@0: fprintf(out, " [-b|--background] \\\n"); jbe@0: fprintf(out, " [-d|--debug] \\\n"); jbe@0: fprintf(out, " [-f|--logfacility {DAEMON|USER|0|1|...|7}] \\\n"); jbe@0: fprintf(out, " [-i|--logident \\\n"); jbe@0: fprintf(out, " [-l|--logfile ] \\\n"); jbe@0: fprintf(out, " [-p|--pidfile ] \\\n"); jbe@0: fprintf(out, " [-s|--stats] \\\n"); jbe@2: fprintf(out, " -- []\n"); jbe@0: exit(err); jbe@0: } jbe@0: jbe@0: #define moonbr_usage_error() moonbr_usage(MOONBR_EXITCODE_CMDLINEERROR, argc ? argv[0] : NULL) jbe@0: jbe@0: int main(int argc, char **argv) { jbe@0: { jbe@0: int daemonize = 0; jbe@0: int log_facility = LOG_USER; jbe@0: const char *log_ident = "moonbridge"; jbe@0: const char *log_filename = NULL; jbe@0: const char *pid_filename = NULL; jbe@0: int option; jbe@0: struct option longopts[] = { jbe@0: { "background", no_argument, NULL, 'b' }, jbe@0: { "debug", no_argument, NULL, 'd' }, jbe@0: { "logfacility", required_argument, NULL, 'f' }, jbe@0: { "help", no_argument, NULL, 'h' }, jbe@0: { "logident", required_argument, NULL, 'i' }, jbe@0: { "logfile", required_argument, NULL, 'l' }, jbe@0: { "pidfile", required_argument, NULL, 'p' }, jbe@231: { "stats", no_argument, NULL, 's' }, jbe@231: { NULL, 0, NULL, 0 } jbe@0: }; jbe@0: while ((option = getopt_long(argc, argv, "bdf:hi:l:p:s", longopts, NULL)) != -1) { jbe@0: switch (option) { jbe@0: case 'b': jbe@0: daemonize = 1; jbe@0: break; jbe@0: case 'd': jbe@0: moonbr_debug = 1; jbe@0: moonbr_stat = 1; jbe@0: break; jbe@0: case 'f': jbe@0: if (!strcmp(optarg, "DAEMON")) { jbe@0: log_facility = LOG_DAEMON; jbe@0: } else if (!strcmp(optarg, "USER")) { jbe@0: log_facility = LOG_USER; jbe@0: } else if (!strcmp(optarg, "0")) { jbe@0: log_facility = LOG_LOCAL0; jbe@0: } else if (!strcmp(optarg, "1")) { jbe@0: log_facility = LOG_LOCAL1; jbe@0: } else if (!strcmp(optarg, "2")) { jbe@0: log_facility = LOG_LOCAL2; jbe@0: } else if (!strcmp(optarg, "3")) { jbe@0: log_facility = LOG_LOCAL3; jbe@0: } else if (!strcmp(optarg, "4")) { jbe@0: log_facility = LOG_LOCAL4; jbe@0: } else if (!strcmp(optarg, "5")) { jbe@0: log_facility = LOG_LOCAL5; jbe@0: } else if (!strcmp(optarg, "6")) { jbe@0: log_facility = LOG_LOCAL6; jbe@0: } else if (!strcmp(optarg, "7")) { jbe@0: log_facility = LOG_LOCAL7; jbe@0: } else { jbe@0: moonbr_usage_error(); jbe@0: } jbe@0: moonbr_use_syslog = 1; jbe@0: break; jbe@0: case 'h': jbe@0: moonbr_usage(MOONBR_EXITCODE_GRACEFUL, argv[0]); jbe@0: break; jbe@0: case 'i': jbe@0: log_ident = optarg; jbe@0: moonbr_use_syslog = 1; jbe@0: break; jbe@0: case 'l': jbe@0: log_filename = optarg; jbe@0: break; jbe@0: case 'p': jbe@0: pid_filename = optarg; jbe@0: break; jbe@0: case 's': jbe@0: moonbr_stat = 1; jbe@0: break; jbe@0: default: jbe@0: moonbr_usage_error(); jbe@0: } jbe@0: } jbe@2: if (argc - optind < 1) moonbr_usage_error(); jbe@0: if (pid_filename) { jbe@0: pid_t otherpid; jbe@0: while ((moonbr_pidfh = pidfile_open(pid_filename, 0644, &otherpid)) == NULL) { jbe@0: if (errno == EEXIST) { jbe@0: if (otherpid == -1) { jbe@0: fprintf(stderr, "PID file \"%s\" is already locked\n", pid_filename); jbe@0: } else { jbe@0: fprintf(stderr, "PID file \"%s\" is already locked by process with PID: %i\n", pid_filename, (int)otherpid); jbe@0: } jbe@0: exit(MOONBR_EXITCODE_ALREADYRUNNING); jbe@0: } else if (errno != EINTR) { jbe@0: fprintf(stderr, "Could not write PID file \"%s\": %s\n", pid_filename, strerror(errno)); jbe@0: exit(MOONBR_EXITCODE_STARTUPERROR); jbe@0: } jbe@0: } jbe@0: } jbe@0: if (log_filename) { jbe@0: int logfd; jbe@0: while ( jbe@0: ( logfd = flopen( jbe@0: log_filename, jbe@0: O_WRONLY|O_NONBLOCK|O_CREAT|O_APPEND|O_CLOEXEC, jbe@0: 0640 jbe@0: ) jbe@0: ) < 0 jbe@0: ) { jbe@0: if (errno == EWOULDBLOCK) { jbe@0: fprintf(stderr, "Logfile \"%s\" is locked\n", log_filename); jbe@0: exit(MOONBR_EXITCODE_ALREADYRUNNING); jbe@0: } else if (errno != EINTR) { jbe@0: fprintf(stderr, "Could not open logfile \"%s\": %s\n", log_filename, strerror(errno)); jbe@0: exit(MOONBR_EXITCODE_STARTUPERROR); jbe@0: } jbe@0: } jbe@0: moonbr_logfile = fdopen(logfd, "a"); jbe@0: if (!moonbr_logfile) { jbe@0: fprintf(stderr, "Could not open write stream to logfile \"%s\": %s\n", log_filename, strerror(errno)); jbe@0: exit(MOONBR_EXITCODE_STARTUPERROR); jbe@0: } jbe@0: } jbe@0: if (daemonize == 0 && !moonbr_logfile) moonbr_logfile = stderr; jbe@0: if (moonbr_logfile) setlinebuf(moonbr_logfile); jbe@0: else moonbr_use_syslog = 1; jbe@0: if (moonbr_use_syslog) openlog(log_ident, LOG_NDELAY | LOG_PID, log_facility); jbe@0: if (daemonize) { jbe@0: if (daemon(1, 0)) { jbe@0: moonbr_log(LOG_ERR, "Could not daemonize moonbridge process"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: } jbe@0: } jbe@0: moonbr_log(LOG_NOTICE, "Starting moonbridge server"); jbe@0: if (moonbr_pidfh && pidfile_write(moonbr_pidfh)) { jbe@0: moonbr_log(LOG_ERR, "Could not write pidfile (after locking)"); jbe@0: } jbe@0: { jbe@0: lua_State *L; jbe@0: L = lua_newstate(moonbr_alloc, NULL); jbe@0: if (!L) { jbe@0: moonbr_log(LOG_CRIT, "Could not initialize Lua state"); jbe@0: moonbr_terminate_error(); jbe@0: } jbe@0: lua_atpanic(L, moonbr_lua_panic); jbe@58: lua_pushliteral(L, MOONBR_VERSION_STRING); jbe@58: lua_setglobal(L, "_MOONBRIDGE_VERSION"); jbe@0: luaL_openlibs(L); jbe@89: luaL_requiref(L, "moonbridge_io", luaopen_moonbridge_io, 1); jbe@89: lua_pop(L, 1); jbe@8: #ifdef MOONBR_LUA_PATH jbe@9: moonbr_modify_path(L, "path", MOONBR_LUA_PATH); jbe@9: #endif jbe@9: #ifdef MOONBR_LUA_CPATH jbe@9: moonbr_modify_path(L, "cpath", MOONBR_LUA_CPATH); jbe@8: #endif jbe@0: lua_pushcfunction(L, moonbr_timeout); jbe@0: lua_setglobal(L, "timeout"); jbe@0: lua_pushcfunction(L, moonbr_listen); jbe@0: lua_setglobal(L, "listen"); jbe@17: lua_pushcfunction(L, moonbr_addtraceback); /* on stack position 1 */ jbe@2: moonbr_log(LOG_INFO, "Loading \"%s\"", argv[optind]); jbe@2: if (luaL_loadfile(L, argv[optind])) { jbe@2: moonbr_log(LOG_ERR, "Error while loading \"%s\": %s", argv[optind], lua_tostring(L, -1)); jbe@2: moonbr_terminate_error(); jbe@2: } jbe@2: { int i; for (i=optind+1; i