slave.c 23.9 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10 11 12
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17 18 19 20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21 22 23 24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25 26 27 28 29 30 31 32 33 34 35 36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#endif

37
#include "compat.h"
38

39
#include <libxml/uri.h>
Marvin Scholz's avatar
Marvin Scholz committed
40 41 42 43
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/net/sock.h"
#include "common/httpp/httpp.h"
44

45
#include "slave.h"
46
#include "cfgfile.h"
47 48 49 50 51 52 53 54
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
Michael Smith's avatar
Michael Smith committed
55
#include "format.h"
56 57 58 59

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
60
static thread_type *_slave_thread_id;
61
static int slave_running = 0;
62
static volatile int update_settings = 0;
63
static volatile int update_all_mounts = 0;
64
static volatile unsigned int max_interval = 0;
65
static mutex_t _slave_mutex; // protects update_settings, update_all_mounts, max_interval
66

67
relay_server *relay_free (relay_server *relay)
68
{
69
    relay_server *next = relay->next;
70
    ICECAST_LOG_DEBUG("freeing relay %s", relay->localmount);
71 72 73 74 75
    if (relay->source)
       source_free_source (relay->source);
    xmlFree (relay->server);
    xmlFree (relay->mount);
    xmlFree (relay->localmount);
76 77 78 79
    if (relay->username)
        xmlFree (relay->username);
    if (relay->password)
        xmlFree (relay->password);
80
    free (relay);
81
    return next;
82 83
}

84

85 86 87
relay_server *relay_copy (relay_server *r)
{
    relay_server *copy = calloc (1, sizeof (relay_server));
Michael Smith's avatar
Michael Smith committed
88

89
    if (copy)
Michael Smith's avatar
Michael Smith committed
90
    {
91 92 93
        copy->server = (char *)xmlCharStrdup (r->server);
        copy->mount = (char *)xmlCharStrdup (r->mount);
        copy->localmount = (char *)xmlCharStrdup (r->localmount);
94
        if (r->username)
95
            copy->username = (char *)xmlCharStrdup (r->username);
96
        if (r->password)
97
            copy->password = (char *)xmlCharStrdup (r->password);
98 99
        copy->port = r->port;
        copy->mp3metadata = r->mp3metadata;
100
        copy->on_demand = r->on_demand;
Michael Smith's avatar
Michael Smith committed
101
    }
102 103
    return copy;
}
104

105

106
/* force a recheck of the relays. This will recheck the master server if
107
 * this is a slave and rebuild all mountpoints in the stats tree
108
 */
109
void slave_update_all_mounts(void)
110
{
111
    thread_mutex_lock(&_slave_mutex);
112
    max_interval = 0;
113
    update_all_mounts = 1;
114
    update_settings = 1;
115
    thread_mutex_unlock(&_slave_mutex);
116 117
}

118 119 120

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
121
 */
122
void slave_rebuild_mounts(void)
123
{
124
    thread_mutex_lock(&_slave_mutex);
125
    update_settings = 1;
126
    thread_mutex_unlock(&_slave_mutex);
127 128
}

129 130

void slave_initialize(void)
Michael Smith's avatar
Michael Smith committed
131
{
132 133
    if (slave_running)
        return;
Michael Smith's avatar
Michael Smith committed
134

135
    slave_running = 1;
136
    max_interval = 0;
137
    thread_mutex_create (&_slave_mutex);
138 139
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
140

Michael Smith's avatar
Michael Smith committed
141

142 143 144
void slave_shutdown(void)
{
    if (!slave_running)
145
        return;
146
    slave_running = 0;
147
    ICECAST_LOG_DEBUG("waiting for slave thread");
148 149 150 151
    thread_join (_slave_thread_id);
}


152 153
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
154
 */
155
static client_t *open_relay_connection (relay_server *relay)
156
{
157
    int redirects = 0;
158 159
    char *server_id = NULL;
    ice_config_t *config;
160 161
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
162 163 164 165
    char *server = strdup (relay->server);
    char *mount = strdup (relay->mount);
    int port = relay->port;
    char *auth_header;
166 167
    char header[4096];

168 169 170 171
    config = config_get_config ();
    server_id = strdup (config->server_id);
    config_release_config ();

172 173
    /* build any authentication header before connecting */
    if (relay->username && relay->password)
174
    {
175 176 177 178 179
        char *esc_authorisation;
        unsigned len = strlen(relay->username) + strlen(relay->password) + 2;

        auth_header = malloc (len);
        snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
180
        esc_authorisation = util_base64_encode(auth_header, len);
181 182 183 184 185 186 187 188 189
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
190

191 192 193 194
    while (redirects < 10)
    {
        sock_t streamsock;

195
        ICECAST_LOG_INFO("connecting to %s:%d", server, port);
196

197
        streamsock = sock_connect_wto_bind (server, port, relay->bind, 10);
198 199
        if (streamsock == SOCK_ERROR)
        {
200
            ICECAST_LOG_WARN("Failed to connect to %s:%d", server, port);
201
            break;
Michael Smith's avatar
Michael Smith committed
202
        }
203
        con = connection_create (streamsock, -1, strdup (server));
204

205 206 207 208 209 210
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
211
                "User-Agent: %s\r\n"
Karl Heyes's avatar
Karl Heyes committed
212
                "Host: %s\r\n"
213
                "%s"
214
                "%s"
215
                "\r\n",
216
                mount,
217
                server_id,
Karl Heyes's avatar
Karl Heyes committed
218
                server,
219 220
                relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
                auth_header);
221
        memset (header, 0, sizeof(header));
222
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
223
        {
224
            ICECAST_LOG_ERROR("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount);
225 226 227 228 229 230
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
        if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
        {
231
            ICECAST_LOG_ERROR("Error parsing relay request for %s (%s:%d%s)", relay->localmount,
232
                    server, port, mount);
233 234
            break;
        }
235
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
236
        {
237 238 239
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
240

241
            uri = httpp_getvar (parser, "location");
242
            ICECAST_LOG_INFO("redirect received %s", uri);
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
262 263 264
            con = NULL;
            parser = NULL;
        }
265 266 267 268 269 270
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
271
                ICECAST_LOG_ERROR("Error from relay request: %s (%s)", relay->localmount,
272 273 274 275 276 277 278 279 280 281 282 283 284 285
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
286
            sock_set_blocking (streamsock, 0);
287 288 289
            client_set_queue (client, NULL);
            free (server);
            free (mount);
290
            free (server_id);
291 292 293 294 295 296 297 298 299
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
300
    free (server_id);
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
    relay_server *relay = arg;
    source_t *src = relay->source;
    client_t *client;

319
    ICECAST_LOG_INFO("Starting relayed source at mountpoint \"%s\"", relay->localmount);
320 321 322 323 324 325 326 327 328 329
    do
    {
        client = open_relay_connection (relay);

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
330 331

        if (connection_complete_source (src, 0) < 0)
332
        {
333
            ICECAST_LOG_INFO("Failed to complete source initialisation");
334 335 336
            client_destroy (client);
            src->client = NULL;
            continue;
337
        }
338
        stats_event_inc(NULL, "source_relay_connections");
339
        stats_event (relay->localmount, "source_ip", client->con->ip);
340

341 342
        source_main (relay->source);

343 344 345 346 347
        if (relay->on_demand == 0)
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
            yp_remove (relay->localmount);
            relay->source->yp_public = -1;
348
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
349
            slave_update_all_mounts();
350 351
        }

352
        /* we've finished, now get cleaned up */
353
        relay->cleanup = 1;
354
        slave_rebuild_mounts();
355 356

        return NULL;
357
    } while (0); /* TODO allow looping through multiple servers */
358

359 360 361 362
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

363
        ICECAST_LOG_DEBUG("failed relay, fallback to %s", relay->source->fallback_mount);
364
        avl_tree_rlock(global.source_tree);
365
        fallback_source = source_find_mount(relay->source->fallback_mount);
366 367

        if (fallback_source != NULL)
368
            source_move_clients(relay->source, fallback_source);
369

370
        avl_tree_unlock(global.source_tree);
371 372
    }

373
    source_clear_source(relay->source);
374

375
    /* cleanup relay, but prevent this relay from starting up again too soon */
376 377
    thread_mutex_lock(&_slave_mutex);
    thread_mutex_lock(&(config_locks()->relay_lock));
Karl Heyes's avatar
Karl Heyes committed
378
    relay->source->on_demand = 0;
379
    relay->start = time(NULL) + max_interval;
380
    relay->cleanup = 1;
381 382
    thread_mutex_unlock(&(config_locks()->relay_lock));
    thread_mutex_unlock(&_slave_mutex);
383 384

    return NULL;
385 386 387 388 389 390 391 392
}


/* wrapper for starting the provided relay stream */
static void check_relay_stream (relay_server *relay)
{
    if (relay->source == NULL)
    {
393 394
        if (relay->localmount[0] != '/')
        {
395
            ICECAST_LOG_WARN("relay mountpoint \"%s\" does not start with /, skipping",
396 397 398
                    relay->localmount);
            return;
        }
399 400
        /* new relay, reserve the name */
        relay->source = source_reserve (relay->localmount);
401
        if (relay->source)
402
        {
403
            ICECAST_LOG_DEBUG("Adding relay source at mountpoint \"%s\"", relay->localmount);
404
            if (relay->on_demand)
Karl Heyes's avatar
Karl Heyes committed
405 406
            {
                ice_config_t *config = config_get_config ();
407
                mount_proxy *mountinfo = config_find_mount (config, relay->localmount, MOUNT_TYPE_NORMAL);
408
                relay->source->on_demand = relay->on_demand;
Karl Heyes's avatar
Karl Heyes committed
409 410 411 412
                if (mountinfo == NULL)
                    source_update_settings (config, relay->source, mountinfo);
                config_release_config ();
                stats_event (relay->localmount, "listeners", "0");
413
                slave_update_all_mounts();
Karl Heyes's avatar
Karl Heyes committed
414
            }
415
        }
416
        else
417 418 419
        {
            if (relay->start == 0)
            {
420
                ICECAST_LOG_WARN("new relay but source \"%s\" already exists", relay->localmount);
421 422 423 424
                relay->start = 1;
            }
            return;
        }
425
    }
426
    do
427
    {
428
        source_t *source = relay->source;
429 430
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
431
            break;
432
        /* check if an inactive on-demand relay has a fallback that has listeners */
433
        if (relay->on_demand && source->on_demand_req == 0)
434 435 436 437 438 439 440 441 442 443
        {
            relay->source->on_demand = relay->on_demand;

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
444
                   ICECAST_LOG_DEBUG("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
445 446 447 448 449 450 451 452
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

453
        relay->start = time(NULL) + 5;
454
        relay->running = 1;
455 456 457
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
458

459
    } while (0);
460
    /* the relay thread may of shut down itself */
461
    if (relay->cleanup)
462
    {
463 464
        if (relay->thread)
        {
465
            ICECAST_LOG_DEBUG("waiting for relay thread for \"%s\"", relay->localmount);
466 467 468
            thread_join (relay->thread);
            relay->thread = NULL;
        }
469 470
        relay->cleanup = 0;
        relay->running = 0;
471

472
        if (relay->on_demand && relay->source)
473 474
        {
            ice_config_t *config = config_get_config ();
475
            mount_proxy *mountinfo = config_find_mount (config, relay->localmount, MOUNT_TYPE_NORMAL);
476 477 478 479
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
            stats_event (relay->localmount, "listeners", "0");
        }
Michael Smith's avatar
Michael Smith committed
480
    }
481 482
}

Michael Smith's avatar
Michael Smith committed
483

484 485 486 487 488 489 490 491 492 493 494 495 496
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
static int relay_has_changed (relay_server *new, relay_server *old)
{
    do
    {
        if (strcmp (new->mount, old->mount) != 0)
            break;
        if (strcmp (new->server, old->server) != 0)
            break;
        if (new->port != old->port)
            break;
497 498
        if (new->mp3metadata != old->mp3metadata)
            break;
499 500
        if (new->on_demand != old->on_demand)
            old->on_demand = new->on_demand;
501 502 503 504 505 506
        return 0;
    } while (0);
    return 1;
}


507 508 509 510 511
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
static relay_server *
512
update_relay_set(relay_server **current, relay_server *updated)
513 514 515 516 517 518 519
{
    relay_server *relay = updated;
    relay_server *existing_relay, **existing_p;
    relay_server *new_list = NULL;

    while (relay)
    {
520 521 522 523 524 525 526
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
            if (strcmp (relay->localmount, existing_relay->localmount) == 0)
527 528
                if (relay_has_changed (relay, existing_relay) == 0)
                    break;
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
            existing_p = &existing_relay->next;
            existing_relay = existing_relay->next;
        }
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
            existing_relay = relay_copy (relay);
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
        relay = relay->next;
544
    }
545 546 547 548 549 550
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
551
 * are separated and returned in a separate list
552
 */
553
static relay_server *
554 555
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
556
    relay_server *active_relays, *cleanup_relays;
557

558
    active_relays = update_relay_set(relay_list, new_relay_list);
559

560 561 562 563 564 565 566 567
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


568 569
static void relay_check_streams (relay_server *to_start,
        relay_server *to_free, int skip_timer)
570 571 572 573
{
    relay_server *relay;

    while (to_free)
574
    {
575
        if (to_free->source)
576
        {
577 578 579
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
580
                ICECAST_LOG_DEBUG("source shutdown request on \"%s\"", to_free->localmount);
581
                to_free->running = 0;
582 583 584 585 586
                to_free->source->running = 0;
                thread_join (to_free->thread);
            }
            else
                stats_event (to_free->localmount, NULL, NULL);
587
        }
588 589 590 591 592 593
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
594 595
        if (skip_timer)
            relay->start = 0;
596 597
        check_relay_stream (relay);
        relay = relay->next;
598
    }
Michael Smith's avatar
Michael Smith committed
599 600
}

601 602 603 604 605

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
606
    sock_t mastersock;
607
    int ret = 0;
608
    char buf[256];
609 610 611
    do
    {
        char *authheader, *data;
612
        relay_server *new_relays = NULL, *cleanup_relays;
613
        int len, count = 1;
614
        int on_demand;
615

616
        username = strdup(config->master_username);
617
        if (config->master_password)
618
            password = strdup(config->master_password);
Michael Smith's avatar
Michael Smith committed
619

620
        if (config->master_server)
621
            master = strdup(config->master_server);
Michael Smith's avatar
Michael Smith committed
622

623
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
624

625 626
        if (password == NULL || master == NULL || port == 0)
            break;
627
        on_demand = config->on_demand;
628 629
        ret = 1;
        config_release_config();
630
        mastersock = sock_connect_wto(master, port, 10);
631

632 633
        if (mastersock == SOCK_ERROR)
        {
634
            ICECAST_LOG_WARN("Relay slave failed to contact master server to fetch stream list");
635 636
            break;
        }
Michael Smith's avatar
Michael Smith committed
637

638 639 640
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
641
        data = util_base64_encode(authheader, len);
642 643 644 645 646 647 648
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

649
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
650
                ((strncmp (buf, "HTTP/1.0 200", 12) != 0) && (strncmp (buf, "HTTP/1.1 200", 12) != 0)))
651 652
        {
            sock_close (mastersock);
653
            ICECAST_LOG_WARN("Master rejected streamlist request");
654
            break;
655 656
        } else {
            ICECAST_LOG_INFO("Master accepted streamlist request");
657 658
        }

659 660 661 662 663 664 665 666 667 668
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            relay_server *r;
            if (!strlen(buf))
                continue;
669
            ICECAST_LOG_DEBUG("read %d from master \"%s\"", count++, buf);
670 671
            xmlURIPtr parsed_uri = xmlParseURI(buf);
            if (parsed_uri == NULL) {
672
                ICECAST_LOG_DEBUG("Error while parsing line from master. Ignoring line.");
673 674
                continue;
            }
675 676 677
            r = calloc (1, sizeof (relay_server));
            if (r)
            {
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693
                if (parsed_uri->server != NULL)
                {
                  r->server = strdup(parsed_uri->server);
                  if (parsed_uri->port == 0)
                    r->port = 80;
                  else
                    r->port = parsed_uri->port;
                }
                else
                {
                  r->server = (char *)xmlCharStrdup (master);
                  r->port = port;
                }

                r->mount = strdup(parsed_uri->path);
                r->localmount = strdup(parsed_uri->path);
694
                r->mp3metadata = 1;
695
                r->on_demand = on_demand;
696
                r->next = new_relays;
697
                ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", r->server, r->port, r->mount);
698
                new_relays = r;
699
            }
700
            xmlFreeURI(parsed_uri);
Michael Smith's avatar
Michael Smith committed
701
        }
702 703
        sock_close (mastersock);

704 705
        thread_mutex_lock (&(config_locks()->relay_lock));
        cleanup_relays = update_relays (&global.master_relays, new_relays);
706

707 708
        relay_check_streams (global.master_relays, cleanup_relays, 0);
        relay_check_streams (NULL, new_relays, 0);
709 710 711

        thread_mutex_unlock (&(config_locks()->relay_lock));

712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
728
    unsigned int interval = 0;
Michael Smith's avatar
Michael Smith committed
729

730 731
    (void)arg;

732
    thread_mutex_lock(&_slave_mutex);
733
    update_settings = 0;
734
    update_all_mounts = 0;
735
    thread_mutex_unlock(&_slave_mutex);
736

Karl Heyes's avatar
Karl Heyes committed
737
    config = config_get_config();
738
    stats_global(config);
Karl Heyes's avatar
Karl Heyes committed
739
    config_release_config();
740
    source_recheck_mounts(1);
741

742
    while (1)
743
    {
744 745
        relay_server *cleanup_relays = NULL;
        int skip_timer = 0;
746

747
        /* re-read xml file if requested */
748
        global_lock();
749 750
        if (global.schedule_config_reread) {
            config_reread_config();
751
            global.schedule_config_reread = 0;
752
        }
753
        global_unlock();
754

755
        thread_sleep(1000000);
756 757
        if (slave_running == 0)
            break;
758 759

        ++interval;
760

761
        /* only update relays lists when required */
762
        thread_mutex_lock(&_slave_mutex);
763 764
        if (max_interval <= interval)
        {
765
            ICECAST_LOG_DEBUG("checking master stream list");
766
            config = config_get_config();
Michael Smith's avatar
Michael Smith committed
767

768 769
            if (max_interval == 0)
                skip_timer = 1;
770 771
            interval = 0;
            max_interval = config->master_update_interval;
772
            thread_mutex_unlock(&_slave_mutex);
Michael Smith's avatar
Michael Smith committed
773

774 775 776
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
777

778
            thread_mutex_lock (&(config_locks()->relay_lock));
779

780
            cleanup_relays = update_relays (&global.relays, config->relay);
781

782 783 784
            config_release_config();
        }
        else
785 786
        {
            thread_mutex_unlock(&_slave_mutex);
787
            thread_mutex_lock (&(config_locks()->relay_lock));
788
        }
789 790 791 792 793

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

794
        thread_mutex_lock(&_slave_mutex);
795 796
        if (update_settings)
        {
797
            source_recheck_mounts (update_all_mounts);
798
            update_settings = 0;
799
            update_all_mounts = 0;
800
        }
801
        thread_mutex_unlock(&_slave_mutex);
802
    }
803
    ICECAST_LOG_INFO("shutting down current relays");
804 805
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
806

807
    ICECAST_LOG_INFO("Slave thread shutdown complete");
808

809
    return NULL;
810
}
Michael Smith's avatar
Michael Smith committed
811