Commit 5257fcaa authored by Philipp Schafft's avatar Philipp Schafft 🦁

Merge branch 'fix-locks'

parents 231e60a4 53d8b7c3
Pipeline #322 failed with stage
in 12 seconds
...@@ -197,7 +197,9 @@ void auth_release (auth_t *authenticator) { ...@@ -197,7 +197,9 @@ void auth_release (auth_t *authenticator) {
/* cleanup auth thread attached to this auth */ /* cleanup auth thread attached to this auth */
if (authenticator->running) { if (authenticator->running) {
authenticator->running = 0; authenticator->running = 0;
thread_mutex_unlock(&authenticator->lock);
thread_join(authenticator->thread); thread_join(authenticator->thread);
thread_mutex_lock(&authenticator->lock);
} }
if (authenticator->free) if (authenticator->free)
...@@ -389,15 +391,18 @@ static void *auth_run_thread (void *arg) ...@@ -389,15 +391,18 @@ static void *auth_run_thread (void *arg)
auth_t *auth = arg; auth_t *auth = arg;
ICECAST_LOG_INFO("Authentication thread started"); ICECAST_LOG_INFO("Authentication thread started");
while (auth->running) while (1) {
{ thread_mutex_lock(&auth->lock);
/* usually no clients are waiting, so don't bother taking locks */
if (auth->head) if (!auth->running) {
{ thread_mutex_unlock(&auth->lock);
break;
}
if (auth->head) {
auth_client *auth_user; auth_client *auth_user;
/* may become NULL before lock taken */ /* may become NULL before lock taken */
thread_mutex_lock (&auth->lock);
auth_user = (auth_client*)auth->head; auth_user = (auth_client*)auth->head;
if (auth_user == NULL) if (auth_user == NULL)
{ {
...@@ -415,6 +420,8 @@ static void *auth_run_thread (void *arg) ...@@ -415,6 +420,8 @@ static void *auth_run_thread (void *arg)
__handle_auth_client(auth, auth_user); __handle_auth_client(auth, auth_user);
continue; continue;
} else {
thread_mutex_unlock(&auth->lock);
} }
thread_sleep (150000); thread_sleep (150000);
} }
...@@ -1037,7 +1044,7 @@ auth_t *auth_stack_get(auth_stack_t *stack) { ...@@ -1037,7 +1044,7 @@ auth_t *auth_stack_get(auth_stack_t *stack) {
if (!stack) if (!stack)
return NULL; return NULL;
thread_mutex_unlock(&stack->lock); thread_mutex_lock(&stack->lock);
auth_addref(auth = stack->auth); auth_addref(auth = stack->auth);
thread_mutex_unlock(&stack->lock); thread_mutex_unlock(&stack->lock);
return auth; return auth;
......
...@@ -1171,8 +1171,8 @@ static int _handle_resources(client_t *client, char **uri) ...@@ -1171,8 +1171,8 @@ static int _handle_resources(client_t *client, char **uri)
} }
} }
listen_sock = listensocket_get_listener(client->con->listensocket_effective);
config = config_get_config(); config = config_get_config();
listen_sock = listensocket_get_listener(client->con->listensocket_effective);
if (listen_sock) { if (listen_sock) {
serverhost = listen_sock->bind_address; serverhost = listen_sock->bind_address;
serverport = listen_sock->port; serverport = listen_sock->port;
...@@ -1341,6 +1341,7 @@ static void _handle_authed_client(client_t *client, void *userdata, auth_result ...@@ -1341,6 +1341,7 @@ static void _handle_authed_client(client_t *client, void *userdata, auth_result
static void _handle_authentication_global(client_t *client, void *userdata, auth_result result) static void _handle_authentication_global(client_t *client, void *userdata, auth_result result)
{ {
ice_config_t *config; ice_config_t *config;
auth_stack_t *authstack;
auth_stack_release(client->authstack); auth_stack_release(client->authstack);
client->authstack = NULL; client->authstack = NULL;
...@@ -1353,8 +1354,11 @@ static void _handle_authentication_global(client_t *client, void *userdata, auth ...@@ -1353,8 +1354,11 @@ static void _handle_authentication_global(client_t *client, void *userdata, auth
ICECAST_LOG_DEBUG("Trying global authenticators for client %p.", client); ICECAST_LOG_DEBUG("Trying global authenticators for client %p.", client);
config = config_get_config(); config = config_get_config();
auth_stack_add_client(config->authstack, client, _handle_authed_client, userdata); authstack = config->authstack;
auth_stack_addref(authstack);
config_release_config(); config_release_config();
auth_stack_add_client(authstack, client, _handle_authed_client, userdata);
auth_stack_release(authstack);
} }
static inline mount_proxy * __find_non_admin_mount(ice_config_t *config, const char *name, mount_type type) static inline mount_proxy * __find_non_admin_mount(ice_config_t *config, const char *name, mount_type type)
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include "curl.h" #include "curl.h"
#include "cfgfile.h" #include "cfgfile.h"
#include "global.h"
#include "logging.h" #include "logging.h"
#define CATMODULE "curl" #define CATMODULE "curl"
...@@ -48,7 +49,6 @@ int icecast_curl_shutdown(void) ...@@ -48,7 +49,6 @@ int icecast_curl_shutdown(void)
CURL *icecast_curl_new(const char *url, char * errors) CURL *icecast_curl_new(const char *url, char * errors)
{ {
ice_config_t *config;
CURL *curl = curl_easy_init(); CURL *curl = curl_easy_init();
if (!curl) if (!curl)
...@@ -77,9 +77,7 @@ CURL *icecast_curl_new(const char *url, char * errors) ...@@ -77,9 +77,7 @@ CURL *icecast_curl_new(const char *url, char * errors)
if (errors) if (errors)
curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errors); curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errors);
config = config_get_config(); curl_easy_setopt(curl, CURLOPT_USERAGENT, ICECAST_VERSION_STRING);
curl_easy_setopt(curl, CURLOPT_USERAGENT, config->server_id);
config_release_config();
return curl; return curl;
} }
......
...@@ -36,7 +36,9 @@ static thread_type *event_thread = NULL; ...@@ -36,7 +36,9 @@ static thread_type *event_thread = NULL;
static void event_addref(event_t *event) { static void event_addref(event_t *event) {
if (!event) if (!event)
return; return;
thread_mutex_lock(&event_lock);
event->refcount++; event->refcount++;
thread_mutex_unlock(&event_lock);
} }
static void event_release(event_t *event) { static void event_release(event_t *event) {
...@@ -44,9 +46,13 @@ static void event_release(event_t *event) { ...@@ -44,9 +46,13 @@ static void event_release(event_t *event) {
if (!event) if (!event)
return; return;
thread_mutex_lock(&event_lock);
event->refcount--; event->refcount--;
if (event->refcount) if (event->refcount) {
thread_mutex_unlock(&event_lock);
return; return;
}
for (i = 0; i < (sizeof(event->reglist)/sizeof(*event->reglist)); i++) for (i = 0; i < (sizeof(event->reglist)/sizeof(*event->reglist)); i++)
event_registration_release(event->reglist[i]); event_registration_release(event->reglist[i]);
...@@ -60,6 +66,7 @@ static void event_release(event_t *event) { ...@@ -60,6 +66,7 @@ static void event_release(event_t *event) {
event_release(event->next); event_release(event->next);
free(event); free(event);
thread_mutex_unlock(&event_lock);
} }
static void event_push(event_t **event, event_t *next) { static void event_push(event_t **event, event_t *next) {
...@@ -78,7 +85,7 @@ static void event_push(event_t **event, event_t *next) { ...@@ -78,7 +85,7 @@ static void event_push(event_t **event, event_t *next) {
return; return;
} }
event_addref(*event = next); *event = next;
} }
static void event_push_reglist(event_t *event, event_registration_t *reglist) { static void event_push_reglist(event_t *event, event_registration_t *reglist) {
...@@ -336,6 +343,7 @@ void event_registration_push(event_registration_t **er, event_registration_t *ta ...@@ -336,6 +343,7 @@ void event_registration_push(event_registration_t **er, event_registration_t *ta
/* event signaling */ /* event signaling */
void event_emit(event_t *event) { void event_emit(event_t *event) {
fastevent_emit(FASTEVENT_TYPE_SLOWEVENT, FASTEVENT_FLAG_NONE, FASTEVENT_DATATYPE_EVENT, event); fastevent_emit(FASTEVENT_TYPE_SLOWEVENT, FASTEVENT_FLAG_NONE, FASTEVENT_DATATYPE_EVENT, event);
event_addref(event);
thread_mutex_lock(&event_lock); thread_mutex_lock(&event_lock);
event_push(&event_queue, event); event_push(&event_queue, event);
thread_mutex_unlock(&event_lock); thread_mutex_unlock(&event_lock);
......
...@@ -72,7 +72,7 @@ static int slave_running = 0; ...@@ -72,7 +72,7 @@ static int slave_running = 0;
static volatile int update_settings = 0; static volatile int update_settings = 0;
static volatile int update_all_mounts = 0; static volatile int update_all_mounts = 0;
static volatile unsigned int max_interval = 0; static volatile unsigned int max_interval = 0;
static mutex_t _slave_mutex; // protects update_settings, update_all_mounts, max_interval static mutex_t _slave_mutex; // protects slave_running, update_settings, update_all_mounts, max_interval
static inline void relay_config_upstream_free (relay_config_upstream_t *upstream) static inline void relay_config_upstream_free (relay_config_upstream_t *upstream)
{ {
...@@ -222,9 +222,14 @@ void slave_initialize(void) ...@@ -222,9 +222,14 @@ void slave_initialize(void)
void slave_shutdown(void) void slave_shutdown(void)
{ {
if (!slave_running) thread_mutex_lock(&_slave_mutex);
if (!slave_running) {
thread_mutex_unlock(&_slave_mutex);
return; return;
}
slave_running = 0; slave_running = 0;
thread_mutex_unlock(&_slave_mutex);
ICECAST_LOG_DEBUG("waiting for slave thread"); ICECAST_LOG_DEBUG("waiting for slave thread");
thread_join (_slave_thread_id); thread_join (_slave_thread_id);
} }
...@@ -895,8 +900,12 @@ static void *_slave_thread(void *arg) ...@@ -895,8 +900,12 @@ static void *_slave_thread(void *arg)
global_unlock(); global_unlock();
thread_sleep(1000000); thread_sleep(1000000);
if (slave_running == 0) thread_mutex_lock(&_slave_mutex);
if (slave_running == 0) {
thread_mutex_unlock(&_slave_mutex);
break; break;
}
thread_mutex_unlock(&_slave_mutex);
++interval; ++interval;
......
...@@ -154,7 +154,9 @@ void stats_shutdown(void) ...@@ -154,7 +154,9 @@ void stats_shutdown(void)
return; return;
/* wait for thread to exit */ /* wait for thread to exit */
thread_mutex_lock(&_stats_mutex);
_stats_running = 0; _stats_running = 0;
thread_mutex_unlock(&_stats_mutex);
thread_join(_stats_thread_id); thread_join(_stats_thread_id);
/* wait for other threads to shut down */ /* wait for other threads to shut down */
...@@ -691,7 +693,14 @@ static void *_stats_thread(void *arg) ...@@ -691,7 +693,14 @@ static void *_stats_thread(void *arg)
stats_event (NULL, "listener_connections", "0"); stats_event (NULL, "listener_connections", "0");
ICECAST_LOG_INFO("stats thread started"); ICECAST_LOG_INFO("stats thread started");
while (_stats_running) { while (1) {
thread_mutex_lock(&_stats_mutex);
if (!_stats_running) {
thread_mutex_unlock(&_stats_mutex);
break;
}
thread_mutex_unlock(&_stats_mutex);
thread_mutex_lock(&_global_event_mutex); thread_mutex_lock(&_global_event_mutex);
if (_global_event_queue.head != NULL) { if (_global_event_queue.head != NULL) {
/* grab the next event from the queue */ /* grab the next event from the queue */
...@@ -834,6 +843,10 @@ static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, i ...@@ -834,6 +843,10 @@ static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, i
xmlNodePtr ret = NULL; xmlNodePtr ret = NULL;
ice_config_t *config; ice_config_t *config;
config = config_get_config();
__add_authstack(config->authstack, root);
config_release_config();
thread_mutex_lock(&_stats_mutex); thread_mutex_lock(&_stats_mutex);
/* general stats first */ /* general stats first */
avlnode = avl_get_first(_stats.global_tree); avlnode = avl_get_first(_stats.global_tree);
...@@ -846,9 +859,6 @@ static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, i ...@@ -846,9 +859,6 @@ static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, i
} }
/* now per mount stats */ /* now per mount stats */
avlnode = avl_get_first(_stats.source_tree); avlnode = avl_get_first(_stats.source_tree);
config = config_get_config();
__add_authstack(config->authstack, root);
config_release_config();
while (avlnode) { while (avlnode) {
stats_source_t *source = (stats_source_t *)avlnode->key; stats_source_t *source = (stats_source_t *)avlnode->key;
...@@ -971,7 +981,14 @@ void *stats_connection(void *arg) ...@@ -971,7 +981,14 @@ void *stats_connection(void *arg)
_register_listener (&listener); _register_listener (&listener);
while (_stats_running) { while (1) {
thread_mutex_lock(&_stats_mutex);
if (!_stats_running) {
thread_mutex_unlock(&_stats_mutex);
break;
}
thread_mutex_unlock(&_stats_mutex);
thread_mutex_lock (&listener.mutex); thread_mutex_lock (&listener.mutex);
event = _get_event_from_queue (&listener.queue); event = _get_event_from_queue (&listener.queue);
thread_mutex_unlock (&listener.mutex); thread_mutex_unlock (&listener.mutex);
......
...@@ -271,7 +271,9 @@ void yp_recheck_config (ice_config_t *config) ...@@ -271,7 +271,9 @@ void yp_recheck_config (ice_config_t *config)
} }
} }
thread_rwlock_unlock (&yp_lock); thread_rwlock_unlock (&yp_lock);
thread_rwlock_wlock(&yp_lock);
yp_update = 1; yp_update = 1;
thread_rwlock_unlock(&yp_lock);
} }
...@@ -721,10 +723,12 @@ static void delete_marked_yp(struct yp_server *server) ...@@ -721,10 +723,12 @@ static void delete_marked_yp(struct yp_server *server)
static void *yp_update_thread(void *arg) static void *yp_update_thread(void *arg)
{ {
ICECAST_LOG_INFO("YP update thread started"); ICECAST_LOG_INFO("YP update thread started");
int running;
yp_running = 1; yp_running = 1;
while (yp_running) running = 1;
{
while (running) {
struct yp_server *server; struct yp_server *server;
thread_sleep (200000); thread_sleep (200000);
...@@ -738,11 +742,10 @@ static void *yp_update_thread(void *arg) ...@@ -738,11 +742,10 @@ static void *yp_update_thread(void *arg)
yp_process_server (server); yp_process_server (server);
server = server->next; server = server->next;
} }
thread_rwlock_unlock (&yp_lock);
/* update the local YP structure */ /* update the local YP structure */
if (yp_update) if (yp_update)
{ {
thread_rwlock_unlock(&yp_lock);
thread_rwlock_wlock (&yp_lock); thread_rwlock_wlock (&yp_lock);
check_servers (); check_servers ();
server = (struct yp_server *)active_yps; server = (struct yp_server *)active_yps;
...@@ -754,8 +757,9 @@ static void *yp_update_thread(void *arg) ...@@ -754,8 +757,9 @@ static void *yp_update_thread(void *arg)
server = server->next; server = server->next;
} }
yp_update = 0; yp_update = 0;
thread_rwlock_unlock (&yp_lock);
} }
running = yp_running;
thread_rwlock_unlock(&yp_lock);
} }
thread_rwlock_destroy (&yp_lock); thread_rwlock_destroy (&yp_lock);
thread_mutex_destroy (&yp_pending_lock); thread_mutex_destroy (&yp_pending_lock);
...@@ -984,8 +988,11 @@ void yp_touch (const char *mount) ...@@ -984,8 +988,11 @@ void yp_touch (const char *mount)
void yp_shutdown (void) void yp_shutdown (void)
{ {
thread_rwlock_wlock(&yp_lock);
yp_running = 0; yp_running = 0;
yp_update = 1; yp_update = 1;
thread_rwlock_unlock(&yp_lock);
if (yp_thread) if (yp_thread)
thread_join (yp_thread); thread_join (yp_thread);
free ((char*)server_version); free ((char*)server_version);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment