diff --git a/src/client.c b/src/client.c index 6d1232be23422619b9bfca50acd73634f0b579e4..bb43d8cd487136ac85af2ebba623d8951412f618 100644 --- a/src/client.c +++ b/src/client.c @@ -39,33 +39,34 @@ #undef CATMODULE #define CATMODULE "client" -client_t *client_create(connection_t *con, http_parser_t *parser) +/* should be called with global lock held */ +int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser) { - ice_config_t *config = config_get_config (); + ice_config_t *config; client_t *client = (client_t *)calloc(1, sizeof(client_t)); - int client_limit = config->client_limit; - config_release_config (); + int ret = -1; + + if (client == NULL) + return -1; + + config = config_get_config (); - global_lock(); - if (global.clients >= client_limit || client == NULL) - { - client_limit = global.clients; - global_unlock(); - free (client); - WARN1 ("server client limit reached (%d clients)", client_limit); - return NULL; - } global.clients++; - stats_event_args (NULL, "clients", "%d", global.clients); - global_unlock(); + if (config->client_limit < global.clients) + WARN2 ("server client limit reached (%d/%d)", config->client_limit, global.clients); + else + ret = 0; + config_release_config (); + + stats_event_args (NULL, "clients", "%d", global.clients); client->con = con; client->parser = parser; - client->refbuf = NULL; client->pos = 0; client->write_to_client = format_generic_write_to_client; + *c_ptr = client; - return client; + return ret; } void client_destroy(client_t *client) @@ -110,7 +111,23 @@ void client_destroy(client_t *client) /* helper function for reading data from a client */ int client_read_bytes (client_t *client, void *buf, unsigned len) { - int bytes = sock_read_bytes (client->con->sock, buf, len); + int bytes; + + if (client->refbuf && client->refbuf->len) + { + /* we have data to read from a refbuf first */ + if (client->refbuf->len < len) + len = client->refbuf->len; + memcpy (buf, client->refbuf->data, len); + if (client->refbuf->len < len) + { + char *ptr = client->refbuf->data; + memmove (ptr, ptr+len, client->refbuf->len - len); + } + client->refbuf->len -= len; + return len; + } + bytes = sock_read_bytes (client->con->sock, buf, len); if (bytes > 0) return bytes; diff --git a/src/client.h b/src/client.h index 0c97de45d09ae4916aea02092c5bae8557ac6f1e..d9216cea44532e1194540ac4387ee8d0e297aa9b 100644 --- a/src/client.h +++ b/src/client.h @@ -67,7 +67,7 @@ typedef struct _client_tag } client_t; -client_t *client_create(connection_t *con, http_parser_t *parser); +int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser); void client_destroy(client_t *client); void client_send_504(client_t *client, char *message); void client_send_404(client_t *client, char *message); diff --git a/src/connection.c b/src/connection.c index 02167c188554f07f5c2027bc99f86b61c45c62c6..439f08d4b671d3978de345a054363ae4ef91df66 100644 --- a/src/connection.c +++ b/src/connection.c @@ -80,10 +80,13 @@ #define SHOUTCAST_SOURCE_AUTH 1 #define ICECAST_SOURCE_AUTH 0 -typedef struct con_queue_tag { - connection_t *con; - struct con_queue_tag *next; -} con_queue_t; +typedef struct client_queue_tag { + client_t *client; + int offset; + int stream_offset; + int shoutcast; + struct client_queue_tag *next; +} client_queue_t; typedef struct _thread_queue_tag { thread_type *thread_id; @@ -93,11 +96,12 @@ typedef struct _thread_queue_tag { static mutex_t _connection_mutex; static volatile unsigned long _current_id = 0; static int _initialized = 0; +static thread_type *tid; -volatile static con_queue_t *_queue = NULL; -static mutex_t _queue_mutex; - -static thread_queue_t *_conhands = NULL; +static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue; +static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; +static mutex_t _con_queue_mutex; +static mutex_t _req_queue_mutex; rwlock_t _source_shutdown_rwlock; @@ -108,7 +112,8 @@ void connection_initialize(void) if (_initialized) return; thread_mutex_create(&_connection_mutex); - thread_mutex_create(&_queue_mutex); + thread_mutex_create(&_con_queue_mutex); + thread_mutex_create(&_req_queue_mutex); thread_mutex_create(&move_clients_mutex); thread_rwlock_create(&_source_shutdown_rwlock); thread_cond_create(&global.shutdown_cond); @@ -122,7 +127,8 @@ void connection_shutdown(void) thread_cond_destroy(&global.shutdown_cond); thread_rwlock_destroy(&_source_shutdown_rwlock); - thread_mutex_destroy(&_queue_mutex); + thread_mutex_destroy(&_con_queue_mutex); + thread_mutex_destroy(&_req_queue_mutex); thread_mutex_destroy(&_connection_mutex); thread_mutex_destroy(&move_clients_mutex); @@ -140,18 +146,18 @@ static unsigned long _next_connection_id(void) return id; } -connection_t *create_connection(sock_t sock, sock_t serversock, char *ip) { +connection_t *connection_create (sock_t sock, sock_t serversock, char *ip) +{ connection_t *con; - con = (connection_t *)malloc(sizeof(connection_t)); - memset(con, 0, sizeof(connection_t)); - con->sock = sock; - con->serversock = serversock; - con->con_time = time(NULL); - con->id = _next_connection_id(); - con->ip = ip; - - con->event_number = EVENT_NO_EVENT; - con->event = NULL; + con = (connection_t *)calloc(1, sizeof(connection_t)); + if (con) + { + con->sock = sock; + con->serversock = serversock; + con->con_time = time(NULL); + con->id = _next_connection_id(); + con->ip = ip; + } return con; } @@ -254,8 +260,11 @@ static connection_t *_accept_connection(void) ip = (char *)malloc(MAX_ADDR_LEN); sock = sock_accept(serversock, ip, MAX_ADDR_LEN); - if (sock >= 0) { - con = create_connection(sock, serversock, ip); + if (sock >= 0) + { + con = connection_create (sock, serversock, ip); + if (con == NULL) + free (ip); return con; } @@ -268,171 +277,207 @@ static connection_t *_accept_connection(void) return NULL; } -static void _add_connection(connection_t *con) -{ - con_queue_t *node; - - node = (con_queue_t *)malloc(sizeof(con_queue_t)); - - thread_mutex_lock(&_queue_mutex); - node->con = con; - node->next = (con_queue_t *)_queue; - _queue = node; - thread_mutex_unlock(&_queue_mutex); -} -static void _push_thread(thread_queue_t **queue, thread_type *thread_id) +/* add client to connection queue. At this point some header information + * has been collected, so we now pass it onto the connection thread for + * further processing + */ +static void _add_connection (client_queue_t *node) { - /* create item */ - thread_queue_t *item = (thread_queue_t *)malloc(sizeof(thread_queue_t)); - item->thread_id = thread_id; - item->next = NULL; - - - thread_mutex_lock(&_queue_mutex); - if (*queue == NULL) { - *queue = item; - } else { - item->next = *queue; - *queue = item; - } - thread_mutex_unlock(&_queue_mutex); + thread_mutex_lock (&_con_queue_mutex); + *_con_queue_tail = node; + _con_queue_tail = (volatile client_queue_t **)&node->next; + thread_mutex_unlock (&_con_queue_mutex); } -static thread_type *_pop_thread(thread_queue_t **queue) -{ - thread_type *id; - thread_queue_t *item; - thread_mutex_lock(&_queue_mutex); +/* this returns queued clients for the connection thread. headers are + * already provided, but need to be parsed. + */ +static client_queue_t *_get_connection(void) +{ + client_queue_t *node = NULL; - item = *queue; - if (item == NULL) { - thread_mutex_unlock(&_queue_mutex); - return NULL; + /* common case, no new connections so don't bother taking locks */ + if (_con_queue) + { + thread_mutex_lock (&_con_queue_mutex); + node = (client_queue_t *)_con_queue; + _con_queue = node->next; + if (_con_queue == NULL) + _con_queue_tail = &_con_queue; + thread_mutex_unlock (&_con_queue_mutex); } + return node; +} - *queue = item->next; - item->next = NULL; - id = item->thread_id; - free(item); - thread_mutex_unlock(&_queue_mutex); +/* run along queue checking for any data that has come in or a timeout */ +static void process_request_queue () +{ + client_queue_t **node_ref = (client_queue_t **)&_req_queue; + ice_config_t *config = config_get_config (); + int timeout = config->header_timeout; + config_release_config(); - return id; -} + while (*node_ref) + { + client_queue_t *node = *node_ref; + client_t *client = node->client; + int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset; + char *buf = client->refbuf->data + node->offset; -static void _build_pool(void) -{ - ice_config_t *config; - int i; - thread_type *tid; - char buff[64]; - int threadpool_size; + if (len > 0) + { + if (client->con->con_time + timeout <= time(NULL)) + len = 0; + else + len = client_read_bytes (client, buf, len); + } - config = config_get_config(); - threadpool_size = config->threadpool_size; - config_release_config(); + if (len > 0) + { + int pass_it = 1; + char *ptr; + + node->offset += len; + client->refbuf->data [node->offset] = '\000'; + do + { + if (node->shoutcast == 1) + { + /* password line */ + if (strstr (client->refbuf->data, "\r\n") != NULL) + break; + if (strstr (client->refbuf->data, "\n") != NULL) + break; + } + /* stream_offset refers to the start of any data sent after the + * http style headers, we don't want to lose those */ + ptr = strstr (client->refbuf->data, "\r\n\r\n"); + if (ptr) + { + node->stream_offset = (ptr+4) - client->refbuf->data; + break; + } + ptr = strstr (client->refbuf->data, "\n\n"); + if (ptr) + { + node->stream_offset = (ptr+2) - client->refbuf->data; + break; + } + pass_it = 0; + } while (0); - for (i = 0; i < threadpool_size; i++) { - snprintf(buff, 64, "Connection Thread #%d", i); - tid = thread_create(buff, _handle_connection, NULL, THREAD_ATTACHED); - _push_thread(&_conhands, tid); + if (pass_it) + { + if ((client_queue_t **)_req_queue_tail == &(node->next)) + _req_queue_tail = (volatile client_queue_t **)node_ref; + *node_ref = node->next; + node->next = NULL; + _add_connection (node); + } + } + else + { + if (len == 0 || client->con->error) + { + if ((client_queue_t **)_req_queue_tail == &node->next) + _req_queue_tail = (volatile client_queue_t **)node_ref; + *node_ref = node->next; + client_destroy (client); + free (node); + continue; + } + } + node_ref = &node->next; } } -static void _destroy_pool(void) -{ - thread_type *id; - int i; - - i = 0; - id = _pop_thread(&_conhands); - while (id != NULL) { - thread_join(id); - id = _pop_thread(&_conhands); - } - INFO0("All connection threads down"); +/* add node to the queue of requests. This is where the clients are when + * initial http details are read. + */ +static void _add_request_queue (client_queue_t *node) +{ + thread_mutex_lock (&_req_queue_mutex); + *_req_queue_tail = node; + _req_queue_tail = (volatile client_queue_t **)&node->next; + thread_mutex_unlock (&_req_queue_mutex); } + void connection_accept_loop(void) { connection_t *con; - _build_pool(); + tid = thread_create ("connection thread", _handle_connection, NULL, THREAD_ATTACHED); while (global.running == ICE_RUNNING) { - if (global . schedule_config_reread) + con = _accept_connection(); + + if (con) { - /* reread config file */ - INFO0("Scheduling config reread ..."); + client_queue_t *node; + ice_config_t *config; + int i; + client_t *client = NULL; - connection_inject_event(EVENT_CONFIG_READ, NULL); - global . schedule_config_reread = 0; - } + global_lock(); + if (client_create (&client, con, NULL) < 0) + { + global_unlock(); + client_send_404 (client, "Icecast connection limit reached"); + continue; + } + global_unlock(); - con = _accept_connection(); + /* setup client for reading incoming http */ + client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE); + client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000'; + client->refbuf->len = 0; /* force reader code to ignore buffer */ + + node = calloc (1, sizeof (client_queue_t)); + if (node == NULL) + { + client_destroy (client); + continue; + } + node->client = client; + + /* Check for special shoutcast compatability processing */ + config = config_get_config(); + for (i = 0; i < global.server_sockets; i++) + { + if (global.serversock[i] == con->serversock) + { + if (config->listeners[i].shoutcast_compat) + node->shoutcast = 1; + } + } + config_release_config(); + + sock_set_blocking (client->con->sock, SOCK_NONBLOCK); + sock_set_nodelay (client->con->sock); - if (con) { - _add_connection(con); + _add_request_queue (node); + stats_event_inc (NULL, "connections"); } + process_request_queue (); } /* Give all the other threads notification to shut down */ thread_cond_broadcast(&global.shutdown_cond); - _destroy_pool(); + if (tid) + thread_join (tid); /* wait for all the sources to shutdown */ thread_rwlock_wlock(&_source_shutdown_rwlock); thread_rwlock_unlock(&_source_shutdown_rwlock); } -static connection_t *_get_connection(void) -{ - con_queue_t *node = NULL; - con_queue_t *oldnode = NULL; - connection_t *con = NULL; - - /* common case, no new connections so don't bother taking locks */ - if (_queue == NULL) - return NULL; - - thread_mutex_lock(&_queue_mutex); - if (_queue) { - node = (con_queue_t *)_queue; - while (node->next) { - oldnode = node; - node = node->next; - } - - /* node is now the last node - ** and oldnode is the previous one, or NULL - */ - if (oldnode) oldnode->next = NULL; - else (_queue) = NULL; - } - thread_mutex_unlock(&_queue_mutex); - - if (node) { - con = node->con; - free(node); - } - - return con; -} - -void connection_inject_event(int eventnum, void *event_data) { - connection_t *con = calloc(1, sizeof(connection_t)); - - con->event_number = eventnum; - con->event = event_data; - - _add_connection(con); -} - /* Called when activating a source. Verifies that the source count is not * exceeded and applies any initial parameters. @@ -484,10 +529,6 @@ int connection_complete_source (source_t *source) return -1; } - global.sources++; - stats_event_args (NULL, "sources", "%d", global.sources); - global_unlock(); - /* for relays, we don't yet have a client, however we do require one * to retrieve the stream from. This is created here, quite late, * because we can't use this client to return an error code/message, @@ -495,12 +536,9 @@ int connection_complete_source (source_t *source) */ if (source->client == NULL) { - source->client = client_create (source->con, source->parser); - if (source->client == NULL) + if (client_create (&source->client, source->con, source->parser) < 0) { config_release_config(); - global_lock(); - global.sources--; global_unlock(); connection_close (source->con); source->con = NULL; @@ -509,6 +547,9 @@ int connection_complete_source (source_t *source) return -1; } } + global.sources++; + stats_event_args (NULL, "sources", "%d", global.sources); + global_unlock(); source->running = 1; mountinfo = config_find_mount (config, source->mount); @@ -823,160 +864,141 @@ static void _handle_get_request (client_t *client, char *passed_uri) return; } - sock_set_blocking(client->con->sock, SOCK_NONBLOCK); - sock_set_nodelay(client->con->sock); - - client->write_to_client = format_generic_write_to_client; - client->check_buffer = format_check_http_buffer; - client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE); - add_client (uri, client); if (uri != passed_uri) free (uri); } -void _handle_shoutcast_compatible(connection_t *con, char *mount, char *source_password) { - char shoutcast_password[256]; +static void _handle_shoutcast_compatible (client_queue_t *node) +{ char *http_compliant; int http_compliant_len = 0; - char header[4096]; http_parser_t *parser; + ice_config_t *config = config_get_config (); + char *shoutcast_mount; + client_t *client = node->client; - memset(shoutcast_password, 0, sizeof (shoutcast_password)); - /* Step one of shoutcast auth protocol, read encoder password (1 line) */ - if (util_read_header(con->sock, shoutcast_password, - sizeof (shoutcast_password), - READ_LINE) == 0) { - /* either we didn't get a complete line, or we timed out */ - connection_close(con); - return; - } - /* Get rid of trailing \n */ - shoutcast_password[strlen(shoutcast_password)-1] = '\000'; - if (strcmp(shoutcast_password, source_password)) { - ERROR0("Invalid source password"); - connection_close(con); - return; - } - /* Step two of shoutcast auth protocol, send OK2. For those - interested, OK2 means it supports metadata updates via admin.cgi, - and the string "OK" can also be sent, but will indicate to the - shoutcast source client to not send metadata updates. - I believe icecast 1.x used to send OK. */ - sock_write(con->sock, "%s\r\n", "OK2"); - - memset(header, 0, sizeof (header)); - /* Step three of shoutcast auth protocol, read HTTP-style - request headers and process them.*/ - if (util_read_header(con->sock, header, sizeof (header), - READ_ENTIRE_HEADER) == 0) { - /* either we didn't get a complete header, or we timed out */ - connection_close(con); + if (node->shoutcast == 1) + { + char *source_password, *ptr; + mount_proxy *mountinfo = config_find_mount (config, config->shoutcast_mount); + + if (mountinfo && mountinfo->password) + source_password = strdup (mountinfo->password); + else + source_password = strdup (config->source_password); + config_release_config(); + + /* Get rid of trailing \r\n or \n after password */ + ptr = strstr (client->refbuf->data, "\r\n"); + if (ptr == NULL) + ptr = strstr (client->refbuf->data, "\n"); + + if (ptr == NULL) + { + client_destroy (client); + free (source_password); + free (node); + return; + } + *ptr = '\0'; + + if (strcmp (client->refbuf->data, source_password) == 0) + { + client->respcode = 200; + /* send this non-blocking but if there is only a partial write + * then leave to header timeout */ + sock_write (client->con->sock, "OK2\r\n"); + memset (client->refbuf->data, 0, client->refbuf->len); + node->shoutcast = 2; + node->offset = 0; + /* we've checked the password, now send it back for reading headers */ + _add_request_queue (node); + free (source_password); + return; + } + client_destroy (client); + free (node); return; } + shoutcast_mount = strdup (config->shoutcast_mount); + config_release_config(); /* Here we create a valid HTTP request based of the information that was passed in via the non-HTTP style protocol above. This means we can use some of our existing code to handle this case */ - http_compliant_len = strlen(header) + strlen(mount) + 20; + http_compliant_len = 20 + strlen (shoutcast_mount) + node->offset; http_compliant = (char *)calloc(1, http_compliant_len); snprintf (http_compliant, http_compliant_len, - "SOURCE %s HTTP/1.0\r\n%s", mount, header); + "SOURCE %s HTTP/1.0\r\n%s", shoutcast_mount, client->refbuf->data); parser = httpp_create_parser(); httpp_initialize(parser, NULL); if (httpp_parse (parser, http_compliant, strlen(http_compliant))) { - client_t *client = client_create (con, parser); - if (client) + /* we may have more than just headers, so prepare for it */ + if (node->stream_offset == node->offset) + client->refbuf->len = 0; + else { - _handle_source_request (client, mount, SHOUTCAST_SOURCE_AUTH); - free (http_compliant); - return; + char *ptr = client->refbuf->data; + client->refbuf->len = node->offset - node->stream_offset; + memmove (ptr, ptr + node->stream_offset, client->refbuf->len); } + client->parser = parser; + _handle_source_request (client, shoutcast_mount, SHOUTCAST_SOURCE_AUTH); } - connection_close (con); - httpp_destroy (parser); + else + client_destroy (client); free (http_compliant); + free (shoutcast_mount); + free (node); + return; } + +/* Connection thread. Here we take clients off the connection queue and check + * the contents provided. We set up the parser then hand off to the specific + * request handler. + */ static void *_handle_connection(void *arg) { - char header[4096]; - connection_t *con; http_parser_t *parser; char *rawuri, *uri; - client_t *client; - int i = 0; - int continue_flag = 0; - ice_config_t *config; - char *source_password; while (global.running == ICE_RUNNING) { - /* grab a connection and set the socket to blocking */ - while ((con = _get_connection())) { + client_queue_t *node = _get_connection(); - /* Handle meta-connections */ - if(con->event_number > 0) { - switch(con->event_number) { - case EVENT_CONFIG_READ: - event_config_read(con->event); - break; - default: - ERROR1("Unknown event number: %d", con->event_number); - break; - } - free(con); - continue; - } - - stats_event_inc(NULL, "connections"); - - sock_set_blocking(con->sock, SOCK_BLOCK); + if (node) + { + client_t *client = node->client; - continue_flag = 0; /* Check for special shoutcast compatability processing */ - for(i = 0; i < MAX_LISTEN_SOCKETS; i++) { - if(global.serversock[i] == con->serversock) { - config = config_get_config(); - if (config->listeners[i].shoutcast_compat) { - char *shoutcast_mount = strdup (config->shoutcast_mount); - mount_proxy *mountinfo = config_find_mount (config, config->shoutcast_mount); - if (mountinfo && mountinfo->password) - source_password = strdup (mountinfo->password); - else - source_password = strdup (config->source_password); - config_release_config(); - _handle_shoutcast_compatible(con, shoutcast_mount, source_password); - free(source_password); - free (shoutcast_mount); - continue_flag = 1; - break; - } - config_release_config(); - } - } - if(continue_flag) { - continue; - } - - /* fill header with the http header */ - memset(header, 0, sizeof (header)); - if (util_read_header(con->sock, header, sizeof (header), - READ_ENTIRE_HEADER) == 0) { - /* either we didn't get a complete header, or we timed out */ - connection_close(con); + if (node->shoutcast) + { + _handle_shoutcast_compatible (node); continue; } + /* process normal HTTP headers */ parser = httpp_create_parser(); httpp_initialize(parser, NULL); - if (httpp_parse(parser, header, strlen(header))) { - /* handle the connection or something */ + client->parser = parser; + if (httpp_parse (parser, client->refbuf->data, node->offset)) + { + /* we may have more than just headers, so prepare for it */ + if (node->stream_offset == node->offset) + client->refbuf->len = 0; + else + { + char *ptr = client->refbuf->data; + client->refbuf->len = node->offset - node->stream_offset; + memmove (ptr, ptr + node->stream_offset, client->refbuf->len); + } + free (node); if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) && strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) { ERROR0("Bad HTTP protocol detected"); - connection_close(con); - httpp_destroy(parser); + client_destroy (client); continue; } @@ -985,19 +1007,7 @@ static void *_handle_connection(void *arg) if (uri == NULL) { - sock_write(con->sock, "The path you requested was invalid\r\n"); - connection_close(con); - httpp_destroy(parser); - continue; - } - client = client_create (con, parser); - if (client == NULL) - { - sock_write (con->sock, "HTTP/1.0 404 File Not Found\r\n" - "Content-Type: text/html\r\n\r\n" - "Connection limit reached"); - connection_close(con); - httpp_destroy(parser); + client_destroy (client); continue; } @@ -1016,16 +1026,16 @@ static void *_handle_connection(void *arg) } free(uri); - continue; } - else { + else + { + free (node); ERROR0("HTTP request parsing failed"); - connection_close(con); - httpp_destroy(parser); - continue; + client_destroy (client); } + continue; } - thread_sleep (100000); + thread_sleep (50000); } DEBUG0 ("Connection thread done"); diff --git a/src/connection.h b/src/connection.h index 086efcf093d5b1876f9866525f35d6c0ac08d924..d3f043a4fa2d6006319837bf96c27cb84016ba44 100644 --- a/src/connection.h +++ b/src/connection.h @@ -38,20 +38,15 @@ typedef struct connection_tag char *ip; char *host; - /* For 'fake' connections */ - int event_number; - void *event; } connection_t; void connection_initialize(void); void connection_shutdown(void); void connection_accept_loop(void); void connection_close(connection_t *con); -connection_t *create_connection(sock_t sock, sock_t serversock, char *ip); +connection_t *connection_create (sock_t sock, sock_t serversock, char *ip); int connection_complete_source (struct source_tag *source); -void connection_inject_event(int eventnum, void *event_data); - int connection_check_source_pass(http_parser_t *parser, const char *mount); int connection_check_relay_pass(http_parser_t *parser); int connection_check_admin_pass(http_parser_t *parser); diff --git a/src/fserve.c b/src/fserve.c index 603602dda14765f8b45d6812efc6a5d765506108..f00ae6bf7f3ec3e46eb15fbeb8b8c4faedc0b7db 100644 --- a/src/fserve.c +++ b/src/fserve.c @@ -571,9 +571,6 @@ int fserve_add_client (client_t *client, FILE *file) fclient->client = client; fclient->ready = 0; - sock_set_blocking (client->con->sock, SOCK_NONBLOCK); - sock_set_nodelay (client->con->sock); - thread_mutex_lock (&pending_lock); fclient->next = (fserve_t *)pending_list; pending_list = fclient; diff --git a/src/slave.c b/src/slave.c index c80d475e1e94b3ea95ce612174b7d4a3b4ce0992..e773909b4290dbd85c5a2307b72b4f9fcaa0d9b0 100644 --- a/src/slave.c +++ b/src/slave.c @@ -56,6 +56,7 @@ #include "logging.h" #include "source.h" #include "format.h" +#include "event.h" #define CATMODULE "slave" @@ -180,7 +181,7 @@ static void *start_relay_stream (void *arg) relay->server, relay->port, relay->mount); break; } - con = create_connection (streamsock, -1, NULL); + con = connection_create (streamsock, -1, NULL); if (relay->username && relay->password) { @@ -598,6 +599,13 @@ static void *_slave_thread(void *arg) { relay_server *cleanup_relays; + /* re-read xml file if requested */ + if (global . schedule_config_reread) + { + event_config_read (NULL); + global . schedule_config_reread = 0; + } + thread_sleep (1000000); if (slave_running == 0) break; diff --git a/src/source.c b/src/source.c index 4a9bd4d4b442035f30982f908b4cbebf330c3a61..b139ed68f01453b803eb7f7c9f185676a72744e3 100644 --- a/src/source.c +++ b/src/source.c @@ -602,9 +602,6 @@ static void source_init (source_t *source) stats_event (source->mount, "listener_peak", "0"); stats_event_time (source->mount, "stream_start"); - if (source->client->con) - sock_set_blocking (source->con->sock, SOCK_NONBLOCK); - DEBUG0("Source creation complete"); source->last_read = time (NULL); source->running = 1;