diff --git a/config b/config index 921004b5..882dd23b 100644 --- a/config +++ b/config @@ -336,6 +336,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/src/ngx_stream_lua_initby.c \ $ngx_addon_dir/src/ngx_stream_lua_args.c \ $ngx_addon_dir/src/ngx_stream_lua_initworkerby.c \ + $ngx_addon_dir/src/ngx_stream_lua_balancer.c \ " NGX_ADDON_DEPS="$NGX_ADDON_DEPS \ @@ -373,6 +374,7 @@ NGX_ADDON_DEPS="$NGX_ADDON_DEPS \ $ngx_addon_dir/src/ngx_stream_lua_initby.h \ $ngx_addon_dir/src/ngx_stream_lua_args.h \ $ngx_addon_dir/src/ngx_stream_lua_initworkerby.h \ + $ngx_addon_dir/src/ngx_stream_lua_balancer.h \ " ngx_feature="export symbols by default (-E)" diff --git a/src/ngx_stream_lua_balancer.c b/src/ngx_stream_lua_balancer.c new file mode 100644 index 00000000..98124f2f --- /dev/null +++ b/src/ngx_stream_lua_balancer.c @@ -0,0 +1,605 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef DDEBUG +#define DDEBUG 0 +#endif +#include "ddebug.h" + + +#include "ngx_stream_lua_cache.h" +#include "ngx_stream_lua_balancer.h" +#include "ngx_stream_lua_util.h" +#include "ngx_stream_lua_directive.h" + + +struct ngx_stream_lua_balancer_peer_data_s { + /* the round robin data must be first */ + ngx_stream_upstream_rr_peer_data_t rrp; + + ngx_stream_lua_srv_conf_t *conf; + ngx_stream_session_t *session; + + ngx_event_get_peer_pt get_rr_peer; + + ngx_uint_t more_tries; + ngx_uint_t total_tries; + + struct sockaddr *sockaddr; + socklen_t socklen; + + ngx_str_t host; + in_port_t port; + + int last_peer_state; +}; + + +static ngx_int_t ngx_stream_lua_balancer_init(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us); +static ngx_int_t ngx_stream_lua_balancer_init_peer(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us); +static ngx_int_t ngx_stream_lua_balancer_get_peer(ngx_peer_connection_t *pc, + void *data); +static ngx_int_t ngx_stream_lua_balancer_by_chunk(lua_State *L, + ngx_log_t *log, ngx_stream_session_t *s); +void ngx_stream_lua_balancer_free_peer(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state); + + +ngx_int_t +ngx_stream_lua_balancer_handler_file(ngx_stream_session_t *s, ngx_log_t *log, + ngx_stream_lua_srv_conf_t *lscf, lua_State *L) +{ + ngx_int_t rc; + + rc = ngx_stream_lua_cache_loadfile(log, L, + lscf->balancer.src.data, + lscf->balancer.src_key); + if (rc != NGX_OK) { + return rc; + } + + /* make sure we have a valid code chunk */ + ngx_stream_lua_assert(lua_isfunction(L, -1)); + + return ngx_stream_lua_balancer_by_chunk(L, log, s); +} + + +ngx_int_t +ngx_stream_lua_balancer_handler_inline(ngx_stream_session_t *s, ngx_log_t *log, + ngx_stream_lua_srv_conf_t *lscf, lua_State *L) +{ + ngx_int_t rc; + + rc = ngx_stream_lua_cache_loadbuffer(log, L, + lscf->balancer.src.data, + lscf->balancer.src.len, + lscf->balancer.src_key, + "=balancer_by_lua"); + if (rc != NGX_OK) { + return rc; + } + + /* make sure we have a valid code chunk */ + ngx_stream_lua_assert(lua_isfunction(L, -1)); + + return ngx_stream_lua_balancer_by_chunk(L, log, s); +} + + +char * +ngx_stream_lua_balancer_by_lua_block(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf) +{ + char *rv; + ngx_conf_t save; + + save = *cf; + cf->handler = ngx_stream_lua_balancer_by_lua; + cf->handler_conf = conf; + + rv = ngx_stream_lua_conf_lua_block_parse(cf, cmd); + + *cf = save; + + return rv; +} + + +char * +ngx_stream_lua_balancer_by_lua(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf) +{ + u_char *p; + u_char *name; + ngx_str_t *value; + ngx_stream_lua_srv_conf_t *lscf = conf; + + ngx_stream_upstream_srv_conf_t *uscf; + + dd("enter"); + + /* must specifiy a content handler */ + if (cmd->post == NULL) { + return NGX_CONF_ERROR; + } + + if (lscf->balancer.handler) { + return "is duplicate"; + } + + value = cf->args->elts; + + lscf->balancer.handler = (ngx_stream_lua_srv_conf_handler_pt) cmd->post; + + if (cmd->post == ngx_stream_lua_balancer_handler_file) { + /* Lua code in an external file */ + + name = ngx_stream_lua_rebase_path(cf->pool, value[1].data, + value[1].len); + if (name == NULL) { + return NGX_CONF_ERROR; + } + + lscf->balancer.src.data = name; + lscf->balancer.src.len = ngx_strlen(name); + + p = ngx_palloc(cf->pool, NGX_STREAM_LUA_FILE_KEY_LEN + 1); + if (p == NULL) { + return NGX_CONF_ERROR; + } + + lscf->balancer.src_key = p; + + p = ngx_copy(p, NGX_STREAM_LUA_FILE_TAG, NGX_STREAM_LUA_FILE_TAG_LEN); + p = ngx_stream_lua_digest_hex(p, value[1].data, value[1].len); + *p = '\0'; + + } else { + /* inlined Lua code */ + lscf->balancer.src = value[1]; + + p = ngx_palloc(cf->pool, NGX_STREAM_LUA_INLINE_KEY_LEN + 1); + if (p == NULL) { + return NGX_CONF_ERROR; + } + + lscf->balancer.src_key = p; + + p = ngx_copy(p, NGX_STREAM_LUA_INLINE_TAG, NGX_STREAM_LUA_INLINE_TAG_LEN); + p = ngx_stream_lua_digest_hex(p, value[1].data, value[1].len); + *p = '\0'; + } + + uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module); + + if (uscf->peer.init_upstream) { + ngx_conf_log_error(NGX_LOG_WARN, cf, 0, + "load balancing method redefined"); + } + + uscf->peer.init_upstream = ngx_stream_lua_balancer_init; + + uscf->flags = NGX_STREAM_UPSTREAM_CREATE + |NGX_STREAM_UPSTREAM_WEIGHT + |NGX_STREAM_UPSTREAM_MAX_FAILS + |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT + |NGX_STREAM_UPSTREAM_DOWN; + + return NGX_CONF_OK; +} + + +static ngx_int_t +ngx_stream_lua_balancer_init(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us) +{ + if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) { + return NGX_ERROR; + } + + /* this callback is called upon individual sessions */ + us->peer.init = ngx_stream_lua_balancer_init_peer; + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_lua_balancer_init_peer(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us) +{ + ngx_stream_lua_srv_conf_t *bcf; + ngx_stream_lua_balancer_peer_data_t *bp; + + bp = ngx_pcalloc(s->connection->pool, sizeof(ngx_stream_lua_balancer_peer_data_t)); + if (bp == NULL) { + return NGX_ERROR; + } + + s->upstream->peer.data = &bp->rrp; + + if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) { + return NGX_ERROR; + } + + s->upstream->peer.get = ngx_stream_lua_balancer_get_peer; + s->upstream->peer.free = ngx_stream_lua_balancer_free_peer; + + bcf = ngx_stream_conf_upstream_srv_conf(us, ngx_stream_lua_module); + + bp->conf = bcf; + bp->get_rr_peer = ngx_stream_upstream_get_round_robin_peer; + bp->session = s; + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_lua_balancer_get_peer(ngx_peer_connection_t *pc, void *data) +{ + lua_State *L; + ngx_int_t rc; + ngx_stream_session_t *s; + ngx_stream_lua_ctx_t *ctx; + ngx_stream_lua_srv_conf_t *lscf; + ngx_stream_lua_main_conf_t *lmcf; + ngx_stream_lua_balancer_peer_data_t *bp = data; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "lua balancer peer, try: %ui", pc->tries); + + lscf = bp->conf; + + s = bp->session; + + ngx_stream_lua_assert(lscf->balancer.handler && s); + + ctx = ngx_stream_get_module_ctx(s, ngx_stream_lua_module); + + if (ctx == NULL) { + ctx = ngx_stream_lua_create_ctx(s); + if (ctx == NULL) { + return NGX_ERROR; + } + + L = ngx_stream_lua_get_lua_vm(s, ctx); + + } else { + L = ngx_stream_lua_get_lua_vm(s, ctx); + + dd("reset ctx"); + ngx_stream_lua_reset_ctx(s, L, ctx); + } + + ctx->context = NGX_STREAM_LUA_CONTEXT_BALANCER; + + bp->sockaddr = NULL; + bp->socklen = 0; + bp->more_tries = 0; + bp->total_tries++; + + lmcf = ngx_stream_get_module_main_conf(s, ngx_stream_lua_module); + + /* balancer_by_lua does not support yielding and + * there cannot be any conflicts among concurrent sessions, + * thus it is safe to store the peer data in the main conf. + */ + lmcf->balancer_peer_data = bp; + + rc = lscf->balancer.handler(s, pc->log, lscf, L); + + if (rc == NGX_ERROR) { + return NGX_ERROR; + } + + if (ctx->exited && ctx->exit_code != NGX_OK) { + rc = ctx->exit_code; + if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) { + return rc; + } + + if (rc > NGX_OK) { + return NGX_ERROR; + } + } + + if (bp->sockaddr && bp->socklen) { + pc->sockaddr = bp->sockaddr; + pc->socklen = bp->socklen; + pc->name = &bp->host; + bp->rrp.peers->single = 0; + + if (bp->more_tries) { + s->upstream->peer.tries += bp->more_tries; + } + + dd("tries: %d", (int) s->upstream->peer.tries); + + return NGX_OK; + } + + return bp->get_rr_peer(pc, &bp->rrp); +} + + +static ngx_int_t +ngx_stream_lua_balancer_by_chunk(lua_State *L, ngx_log_t *log, ngx_stream_session_t *s) +{ + u_char *err_msg; + size_t len; + ngx_int_t rc; + + /* init nginx context in Lua VM */ + ngx_stream_lua_set_session(L, s); + ngx_stream_lua_create_new_globals_table(L, 0 /* narr */, 1 /* nrec */); + + /* {{{ make new env inheriting main thread's globals table */ + lua_createtable(L, 0, 1 /* nrec */); /* the metatable for the new env */ + ngx_stream_lua_get_globals_table(L); + lua_setfield(L, -2, "__index"); + lua_setmetatable(L, -2); /* setmetatable({}, {__index = _G}) */ + /* }}} */ + + lua_setfenv(L, -2); /* set new running env for the code closure */ + + lua_pushcfunction(L, ngx_stream_lua_traceback); + lua_insert(L, 1); /* put it under chunk and args */ + + /* protected call user code */ + rc = lua_pcall(L, 0, 1, 1); + + lua_remove(L, 1); /* remove traceback function */ + + dd("rc == %d", (int) rc); + + if (rc != 0) { + /* error occured when running loaded code */ + err_msg = (u_char *) lua_tolstring(L, -1, &len); + + if (err_msg == NULL) { + err_msg = (u_char *) "unknown reason"; + len = sizeof("unknown reason") - 1; + } + + ngx_log_error(NGX_LOG_ERR, log, 0, + "failed to run balancer_by_lua*: %*s", len, err_msg); + + lua_settop(L, 0); /* clear remaining elems on stack */ + + return NGX_ERROR; + } + + lua_settop(L, 0); /* clear remaining elems on stack */ + return rc; +} + + +void +ngx_stream_lua_balancer_free_peer(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state) +{ + ngx_stream_lua_balancer_peer_data_t *bp = data; + + if (bp->sockaddr && bp->socklen) { + bp->last_peer_state = (int) state; + + if (pc->tries) { + pc->tries--; + } + + return; + } + + /* fallback */ + + ngx_stream_upstream_free_round_robin_peer(pc, data, state); +} + + +#ifndef NGX_LUA_NO_FFI_API + +int +ngx_stream_lua_ffi_balancer_set_current_peer(ngx_stream_session_t *s, + const u_char *adds, size_t addr_len, int port, char **err) +{ + ngx_url_t url; + ngx_stream_lua_ctx_t *ctx; + ngx_stream_upstream_t *u; + + ngx_stream_lua_main_conf_t *lmcf; + ngx_stream_lua_balancer_peer_data_t *bp; + + if (s == NULL) { + *err = "no session found"; + return NGX_ERROR; + } + + ctx = ngx_stream_get_module_ctx(s, ngx_stream_lua_module); + if (ctx == NULL) { + *err = "no ctx found"; + return NGX_ERROR; + } + + if ((ctx->context & NGX_STREAM_LUA_CONTEXT_BALANCER) == 0) { + *err = "API disabled in the current context"; + return NGX_ERROR; + } + + u = s->upstream; + + if (u == NULL) { + *err = "no upstream found"; + return NGX_ERROR; + } + + lmcf = ngx_stream_get_module_main_conf(s, ngx_stream_lua_module); + + /* we cannot read s->upstream->peer.data here directly because + * it could be overridden by other modules like + * ngx_stream_upstream_keepalive_module. + */ + bp = lmcf->balancer_peer_data; + if (bp == NULL) { + *err = "no upstream peer data found"; + return NGX_ERROR; + } + + ngx_memzero(&url, sizeof(ngx_url_t)); + + url.url.data = ngx_palloc(s->connection->pool, addr_len); + if (url.url.data == NULL) { + *err = "no memory"; + return NGX_ERROR; + } + + ngx_memcpy(url.url.data, adds, addr_len); + + url.url.len = addr_len; + url.default_port = (in_port_t) port; + url.uri_part = 0; + url.no_resolve = 1; + + if (ngx_parse_url(s->connection->pool, &url) != NGX_OK) { + if (url.err) { + *err = url.err; + } + + return NGX_ERROR; + } + + if (url.addrs && url.addrs[0].sockaddr) { + bp->sockaddr = url.addrs[0].sockaddr; + bp->socklen = url.addrs[0].socklen; + bp->host = url.addrs[0].name; + + } else { + *err = "no host allowed"; + return NGX_ERROR; + } + + return NGX_OK; +} + + +int +ngx_stream_lua_ffi_balancer_set_more_tries(ngx_stream_session_t *s, + int count, char **err) +{ + ngx_stream_lua_ctx_t *ctx; + ngx_stream_upstream_t *u; + + ngx_stream_lua_main_conf_t *lmcf; + ngx_stream_lua_balancer_peer_data_t *bp; + + if (s == NULL) { + *err = "no session found"; + return NGX_ERROR; + } + + ctx = ngx_stream_get_module_ctx(s, ngx_stream_lua_module); + if (ctx == NULL) { + *err = "no ctx found"; + return NGX_ERROR; + } + + if ((ctx->context & NGX_STREAM_LUA_CONTEXT_BALANCER) == 0) { + *err = "API disabled in the current context"; + return NGX_ERROR; + } + + u = s->upstream; + + if (u == NULL) { + *err = "no upstream found"; + return NGX_ERROR; + } + + lmcf = ngx_stream_get_module_main_conf(s, ngx_stream_lua_module); + + bp = lmcf->balancer_peer_data; + if (bp == NULL) { + *err = "no upstream peer data found"; + return NGX_ERROR; + } + + *err = NULL; + + bp->more_tries = count; + return NGX_OK; +} + + +int +ngx_stream_lua_ffi_balancer_get_last_failure(ngx_stream_session_t *s, + int *status, char **err) +{ + + /* This is not yet implemented. + * The stream module does not appear to have this available + * Or at-least it does not yet + */ + + ngx_stream_lua_ctx_t *ctx; + ngx_stream_upstream_t *u; + /* ngx_stream_upstream_state_t *state; */ + + ngx_stream_lua_balancer_peer_data_t *bp; + ngx_stream_lua_main_conf_t *lmcf; + + if (s == NULL) { + *err = "no session found"; + return NGX_ERROR; + } + + ctx = ngx_stream_get_module_ctx(s, ngx_stream_lua_module); + if (ctx == NULL) { + *err = "no ctx found"; + return NGX_ERROR; + } + + if ((ctx->context & NGX_STREAM_LUA_CONTEXT_BALANCER) == 0) { + *err = "API disabled in the current context"; + return NGX_ERROR; + } + + u = s->upstream; + + if (u == NULL) { + *err = "no upstream found"; + return NGX_ERROR; + } + + lmcf = ngx_stream_get_module_main_conf(s, ngx_stream_lua_module); + + bp = lmcf->balancer_peer_data; + if (bp == NULL) { + *err = "no upstream peer data found"; + return NGX_ERROR; + } + + /*if (r->upstream_states && s->upstream_states->nelts > 1) { + state = s->upstream_states->elts; + *status = (int) state[r->upstream_states->nelts - 2].status; + + } else { + *status = 0; + }*/ + + /* + * Given that the stream upstream module is very basic in comparison + * with the http upstream module, we do not have any status output + * available at this moment. + */ + *status = 0; + + return bp->last_peer_state; +} + +#endif /* NGX_LUA_NO_FFI_API */ diff --git a/src/ngx_stream_lua_balancer.h b/src/ngx_stream_lua_balancer.h new file mode 100644 index 00000000..df98289a --- /dev/null +++ b/src/ngx_stream_lua_balancer.h @@ -0,0 +1,27 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef _NGX_STREAM_LUA_BALANCER_H_INCLUDED_ +#define _NGX_STREAM_LUA_BALANCER_H_INCLUDED_ + + +#include "ngx_stream_lua_common.h" + + +ngx_int_t ngx_stream_lua_balancer_handler_inline(ngx_stream_session_t *s, ngx_log_t *r, + ngx_stream_lua_srv_conf_t *lscf, lua_State *L); + +ngx_int_t ngx_stream_lua_balancer_handler_file(ngx_stream_session_t *s, ngx_log_t *r, + ngx_stream_lua_srv_conf_t *lscf, lua_State *L); + +char *ngx_stream_lua_balancer_by_lua(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + +char *ngx_stream_lua_balancer_by_lua_block(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +#endif /* _NGX_HTTP_LUA_BALANCER_H_INCLUDED_ */ diff --git a/src/ngx_stream_lua_common.h b/src/ngx_stream_lua_common.h index 443d94a3..588f9797 100644 --- a/src/ngx_stream_lua_common.h +++ b/src/ngx_stream_lua_common.h @@ -61,6 +61,7 @@ #define NGX_STREAM_LUA_CONTEXT_LOG 0x002 #define NGX_STREAM_LUA_CONTEXT_TIMER 0x004 #define NGX_STREAM_LUA_CONTEXT_INIT_WORKER 0x008 +#define NGX_STREAM_LUA_CONTEXT_BALANCER 0x010 /* Nginx Stream Lua Inline tag prefix */ @@ -90,6 +91,7 @@ typedef void (*ngx_stream_lua_cleanup_pt)(void *data); typedef struct ngx_stream_lua_cleanup_s ngx_stream_lua_cleanup_t; + struct ngx_stream_lua_cleanup_s { ngx_stream_lua_cleanup_pt handler; void *data; @@ -118,10 +120,15 @@ typedef struct { typedef struct ngx_stream_lua_semaphore_mm_s ngx_stream_lua_semaphore_mm_t; typedef struct ngx_stream_lua_main_conf_s ngx_stream_lua_main_conf_t; +typedef struct ngx_stream_lua_srv_conf_s ngx_stream_lua_srv_conf_t; +typedef struct ngx_stream_lua_balancer_peer_data_s + ngx_stream_lua_balancer_peer_data_t; typedef ngx_int_t (*ngx_stream_lua_main_conf_handler_pt)(ngx_log_t *log, ngx_stream_lua_main_conf_t *lmcf, lua_State *L); +typedef ngx_int_t (*ngx_stream_lua_srv_conf_handler_pt)(ngx_stream_session_t *s, ngx_log_t *log, + ngx_stream_lua_srv_conf_t *lscf, lua_State *L); typedef struct { @@ -168,6 +175,12 @@ struct ngx_stream_lua_main_conf_s { ngx_stream_lua_semaphore_mm_t *semaphore_mm; + ngx_stream_lua_balancer_peer_data_t *balancer_peer_data; + /* balancer_by_lua does not support yielding and + * there cannot be any conflicts among concurrent requests, + * thus it is safe to store the peer data in the main conf. + */ + unsigned requires_access:1; unsigned requires_shm:1; }; @@ -182,7 +195,7 @@ typedef void (*ngx_stream_lua_event_handler_pt)(ngx_stream_session_t *s, ngx_stream_lua_ctx_t *ctx); -typedef struct { +struct ngx_stream_lua_srv_conf_s { #if (NGX_STREAM_SSL) ngx_ssl_t *ssl; /* shared by SSL cosockets */ @@ -229,7 +242,13 @@ typedef struct { ngx_msec_t lingering_time; ngx_msec_t lingering_timeout; -} ngx_stream_lua_srv_conf_t; + struct { + ngx_str_t src; + u_char *src_key; + ngx_stream_lua_srv_conf_handler_pt handler; + } balancer; + +}; enum { diff --git a/src/ngx_stream_lua_control.c b/src/ngx_stream_lua_control.c index bf1690c5..93f559df 100644 --- a/src/ngx_stream_lua_control.c +++ b/src/ngx_stream_lua_control.c @@ -57,7 +57,8 @@ ngx_stream_lua_ngx_exit(lua_State *L) } ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT - | NGX_STREAM_LUA_CONTEXT_TIMER); + | NGX_STREAM_LUA_CONTEXT_TIMER + | NGX_STREAM_LUA_CONTEXT_BALANCER); rc = (ngx_int_t) luaL_checkinteger(L, 1); @@ -69,6 +70,11 @@ ngx_stream_lua_ngx_exit(lua_State *L) ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, "stream lua exit with code %i", ctx->exit_code); + if (ctx->context & NGX_STREAM_LUA_CONTEXT_BALANCER) + { + return 0; + } + dd("calling yield"); return lua_yield(L, 0); } @@ -145,7 +151,8 @@ ngx_stream_lua_ffi_exit(ngx_stream_session_t *s, int status, u_char *err, } if (ngx_stream_lua_ffi_check_context(ctx, NGX_STREAM_LUA_CONTEXT_CONTENT - | NGX_STREAM_LUA_CONTEXT_TIMER, + | NGX_STREAM_LUA_CONTEXT_TIMER + | NGX_STREAM_LUA_CONTEXT_BALANCER, err, errlen) != NGX_OK) { @@ -158,6 +165,11 @@ ngx_stream_lua_ffi_exit(ngx_stream_session_t *s, int status, u_char *err, ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, "stream lua exit with code %i", ctx->exit_code); + if (ctx->context & NGX_STREAM_LUA_CONTEXT_BALANCER) + { + return 0; + } + return NGX_OK; } #endif /* NGX_LUA_NO_FFI_API */ diff --git a/src/ngx_stream_lua_module.c b/src/ngx_stream_lua_module.c index 4d0d3c73..07041456 100644 --- a/src/ngx_stream_lua_module.c +++ b/src/ngx_stream_lua_module.c @@ -14,6 +14,7 @@ #include "ngx_stream_lua_common.h" #include "ngx_stream_lua_directive.h" #include "ngx_stream_lua_contentby.h" +#include "ngx_stream_lua_balancer.h" #include "ngx_stream_lua_semaphore.h" #include "ngx_stream_lua_initby.h" #include "ngx_stream_lua_initworkerby.h" @@ -104,6 +105,20 @@ static ngx_command_t ngx_stream_lua_commands[] = { 0, (void *) ngx_stream_lua_content_handler_file }, + { ngx_string("balancer_by_lua_block"), + NGX_STREAM_UPS_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, + ngx_stream_lua_balancer_by_lua_block, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + (void *) ngx_stream_lua_balancer_handler_inline }, + + { ngx_string("balancer_by_lua_file"), + NGX_STREAM_UPS_CONF|NGX_CONF_TAKE1, + ngx_stream_lua_balancer_by_lua, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + (void *) ngx_stream_lua_balancer_handler_file }, + { ngx_string("lua_max_running_timers"), NGX_STREAM_MAIN_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, diff --git a/src/ngx_stream_lua_phase.c b/src/ngx_stream_lua_phase.c index 436305cf..d4bc4235 100644 --- a/src/ngx_stream_lua_phase.c +++ b/src/ngx_stream_lua_phase.c @@ -58,6 +58,10 @@ ngx_stream_lua_ngx_get_phase(lua_State *L) lua_pushliteral(L, "timer"); break; + case NGX_STREAM_LUA_CONTEXT_BALANCER: + lua_pushliteral(L, "balancer"); + break; + default: return luaL_error(L, "unknown phase: %d", (int) ctx->context); } diff --git a/src/ngx_stream_lua_util.h b/src/ngx_stream_lua_util.h index 8b646648..4ed5aa06 100644 --- a/src/ngx_stream_lua_util.h +++ b/src/ngx_stream_lua_util.h @@ -88,6 +88,7 @@ void ngx_stream_lua_set_multi_value_table(lua_State *L, int index); : (c) == NGX_STREAM_LUA_CONTEXT_LOG ? "log_by_lua*" \ : (c) == NGX_STREAM_LUA_CONTEXT_TIMER ? "ngx.timer" \ : (c) == NGX_STREAM_LUA_CONTEXT_INIT_WORKER ? "init_worker_by_lua*" \ + : (c) == NGX_STREAM_LUA_CONTEXT_BALANCER ? "balancer_by_lua*" \ : "(unknown)") diff --git a/t/138-balancer.t b/t/138-balancer.t new file mode 100644 index 00000000..b675a71a --- /dev/null +++ b/t/138-balancer.t @@ -0,0 +1,136 @@ +# vim:set ft= ts=4 sw=4 et fdm=marker: + +use Test::Nginx::Socket::Lua::Stream; + +#worker_connections(1014); +#master_on(); +#workers(2); +#log_level('warn'); + +repeat_each(2); + +plan tests => repeat_each() * (blocks() * 2 + 4); + +#no_diff(); +no_long_string(); +run_tests(); + +__DATA__ + +=== TEST 1: simple logging +--- stream_config + upstream backend { + server 0.0.0.1:80; + balancer_by_lua_block { + print("hello from balancer by lua!") + } + } +--- stream_server_config + proxy_pass backend; +--- error_log eval +[ +'[lua] balancer_by_lua:2: hello from balancer by lua! while connecting to upstream,', +qr{\[crit\] .*? connect\(\) to 0\.0\.0\.1:80 failed .*?, upstream: "0\.0\.0\.1:80"}, +] + + + +=== TEST 2: simple logging (by_lua_file) +--- stream_config + upstream backend { + server 0.0.0.1:80; + balancer_by_lua_file html/a.lua; + } +--- stream_server_config + proxy_pass backend; +--- user_files +>>> a.lua +print("hello from balancer by lua!") +--- error_log eval +[ +'[lua] a.lua:1: hello from balancer by lua! while connecting to upstream,', +qr{\[crit\] .*? connect\(\) to 0\.0\.0\.1:80 failed .*?, upstream: "0\.0\.0\.1:80"}, +] + + + +=== TEST 3: cosockets are disabled +--- stream_config + upstream backend { + server 0.0.0.1:80; + balancer_by_lua_block { + local sock, err = ngx.socket.tcp() + } + } +--- stream_server_config + proxy_pass backend; +--- error_log eval +qr/\[error\] .*? failed to run balancer_by_lua\*: balancer_by_lua:2: API disabled in the context of balancer_by_lua\*/ + + + +=== TEST 4: ngx.exit ERROR works +--- stream_config + upstream backend { + server 0.0.0.1:80; + balancer_by_lua_block { + print("hello from balancer by lua!") + ngx.exit(ngx.ERROR) + } + } +--- stream_server_config + proxy_pass backend; +--- error_log +[lua] balancer_by_lua:2: hello from balancer by lua! while connecting to upstream + + + +=== TEST 5: ngx.exit OK works +--- stream_config + upstream backend { + server 0.0.0.1:80; + balancer_by_lua_block { + print("hello from balancer by lua!") + ngx.exit(ngx.OK) + } + } +--- stream_server_config + proxy_pass backend; +--- error_log eval +[ +'[lua] balancer_by_lua:2: hello from balancer by lua! while connecting to upstream,', +qr{\[crit\] .*? connect\(\) to 0\.0\.0\.1:80 failed .*?, upstream: "0\.0\.0\.1:80"}, +] + + + +=== TEST 6: ngx.sleep is disabled +--- stream_config + upstream backend { + server 0.0.0.1:80; + balancer_by_lua_block { + ngx.sleep(0.1) + } + } +--- stream_server_config + proxy_pass backend; +--- error_log eval +qr/\[error\] .*? failed to run balancer_by_lua\*: balancer_by_lua:2: API disabled in the context of balancer_by_lua\*/ + + + +=== TEST 7: get_phase +--- stream_config + upstream backend { + server 0.0.0.1:80; + balancer_by_lua_block { + print("I am in phase ", ngx.get_phase()) + } + } +--- stream_server_config + proxy_pass backend; +--- grep_error_log eval: qr/I am in phase \w+/ +--- grep_error_log_out +I am in phase balancer +--- error_log eval +qr{\[crit\] .*? connect\(\) to 0\.0\.0\.1:80 failed .*?, upstream: "0\.0\.0\.1:80"}