slave.c 28.4 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
11
 * Copyright 2012-2018, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
12 13
 */

14
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
15 16 17 18 19 20 21
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

22 23 24 25
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

26 27 28 29 30 31 32 33 34 35 36 37
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#endif

38
#include "compat.h"
39

40
#include <libxml/uri.h>
Marvin Scholz's avatar
Marvin Scholz committed
41 42 43 44
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/net/sock.h"
#include "common/httpp/httpp.h"
45

46
#include "slave.h"
47
#include "cfgfile.h"
48 49 50 51 52 53 54 55
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
56
#include "format.h"
57 58 59

#define CATMODULE "slave"

60 61 62 63 64 65 66 67 68 69
struct relay_tag {
    relay_config_t *config;
    source_t *source;
    int running;
    int cleanup;
    time_t start;
    thread_type *thread;
    relay_t *next;
};

70
static void *_slave_thread(void *arg);
71
static thread_type *_slave_thread_id;
72
static int slave_running = 0;
73
static volatile int update_settings = 0;
74
static volatile int update_all_mounts = 0;
75
static volatile unsigned int max_interval = 0;
76
static mutex_t _slave_mutex; // protects slave_running, update_settings, update_all_mounts, max_interval
77

78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
static inline void relay_config_upstream_free (relay_config_upstream_t *upstream)
{
    if (upstream->server)
        xmlFree(upstream->server);
    if (upstream->mount)
        xmlFree(upstream->mount);
    if (upstream->username)
        xmlFree(upstream->username);
    if (upstream->password)
        xmlFree(upstream->password);
}

void relay_config_free (relay_config_t *relay)
{
    size_t i;

    ICECAST_LOG_DEBUG("freeing relay config for %s", relay->localmount);

    for (i = 0; i < relay->upstreams; i++) {
        relay_config_upstream_free(&(relay->upstream[i]));
    }

    relay_config_upstream_free(&(relay->upstream_default));

    xmlFree(relay->localmount);
    free(relay->upstream);
    free(relay);
}

relay_t *relay_free (relay_t *relay)
108
{
109 110 111 112
    relay_t *next = relay->next;

    ICECAST_LOG_DEBUG("freeing relay %s", relay->config->localmount);

113 114
    if (relay->source)
       source_free_source (relay->source);
115 116 117 118

    relay_config_free(relay->config);

    free(relay);
119
    return next;
120 121
}

122

123
static inline void relay_config_upstream_copy(relay_config_upstream_t *dst, const relay_config_upstream_t *src)
124
{
125 126
    dst->server = (char *)xmlCharStrdup(src->server);
    dst->mount = (char *)xmlCharStrdup(src->mount);
127

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
    if (src->username)
        dst->username = (char *)xmlCharStrdup(src->username);
    if (src->password)
        dst->password = (char *)xmlCharStrdup(src->password);

    dst->port = src->port;

    dst->mp3metadata = src->mp3metadata;
}

static inline relay_config_t *relay_config_copy (relay_config_t *r)
{
    relay_config_t *copy = calloc (1, sizeof (relay_config_t));
    relay_config_upstream_t *u = NULL;
    size_t i;

    if (r->upstreams) {
        u = calloc(r->upstreams, sizeof(relay_config_upstream_t));
        if (!u) {
            free(copy);
            return NULL;
        }
    }

    if (!copy) {
        free(u);
        return NULL;
155
    }
156 157

    copy->upstream = u;
158
    copy->upstreams = r->upstreams;
159 160 161 162 163 164 165

    copy->localmount = (char *)xmlCharStrdup(r->localmount);
    copy->on_demand = r->on_demand;

    relay_config_upstream_copy(&(copy->upstream_default), &(r->upstream_default));

    for (i = 0; i < r->upstreams; i++)
Philipp Schafft's avatar
Philipp Schafft committed
166
        relay_config_upstream_copy(&(copy->upstream[i]), &(r->upstream[i]));
167 168


169 170
    return copy;
}
171

172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
static inline relay_t *relay_new(relay_config_t *config)
{
    relay_t *r = calloc(1, sizeof(*r));

    if (!r)
        return NULL;

    r->config = relay_config_copy(config);
    if (!r->config) {
        free(r);
        return NULL;
    }

    return r;
}
187

188
/* force a recheck of the relays. This will recheck the master server if
189
 * this is a slave and rebuild all mountpoints in the stats tree
190
 */
191
void slave_update_all_mounts(void)
192
{
193
    thread_mutex_lock(&_slave_mutex);
194
    max_interval = 0;
195
    update_all_mounts = 1;
196
    update_settings = 1;
197
    thread_mutex_unlock(&_slave_mutex);
198 199
}

200 201 202

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
203
 */
204
void slave_rebuild_mounts(void)
205
{
206
    thread_mutex_lock(&_slave_mutex);
207
    update_settings = 1;
208
    thread_mutex_unlock(&_slave_mutex);
209 210
}

211 212

void slave_initialize(void)
213
{
214 215
    if (slave_running)
        return;
216

217
    slave_running = 1;
218
    max_interval = 0;
219
    thread_mutex_create (&_slave_mutex);
220 221
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
222

223

224 225
void slave_shutdown(void)
{
226 227 228
    thread_mutex_lock(&_slave_mutex);
    if (!slave_running) {
        thread_mutex_unlock(&_slave_mutex);
229
        return;
230
    }
231
    slave_running = 0;
232 233
    thread_mutex_unlock(&_slave_mutex);

234
    ICECAST_LOG_DEBUG("waiting for slave thread");
235 236 237 238
    thread_join (_slave_thread_id);
}


239 240
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
241
 */
242 243
#define _GET_UPSTREAM_SETTING(n) ((upstream && upstream->n) ? upstream->n : relay->config->upstream_default.n)
static client_t *open_relay_connection (relay_t *relay, relay_config_upstream_t *upstream)
244
{
245
    int redirects = 0;
246 247
    char *server_id = NULL;
    ice_config_t *config;
248 249
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
250 251 252
    char *server = strdup (_GET_UPSTREAM_SETTING(server));
    char *mount = strdup (_GET_UPSTREAM_SETTING(mount));
    int port = _GET_UPSTREAM_SETTING(port);
253
    char *auth_header;
254 255
    char header[4096];

256 257 258 259
    config = config_get_config ();
    server_id = strdup (config->server_id);
    config_release_config ();

260
    /* build any authentication header before connecting */
261
    if (_GET_UPSTREAM_SETTING(username) && _GET_UPSTREAM_SETTING(password))
262
    {
263
        char *esc_authorisation;
264
        unsigned len = strlen(_GET_UPSTREAM_SETTING(username)) + strlen(_GET_UPSTREAM_SETTING(password)) + 2;
265 266

        auth_header = malloc (len);
267
        snprintf (auth_header, len, "%s:%s", _GET_UPSTREAM_SETTING(username), _GET_UPSTREAM_SETTING(password));
268
        esc_authorisation = util_base64_encode(auth_header, len);
269 270 271 272 273 274 275 276 277
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
278

279 280 281 282
    while (redirects < 10)
    {
        sock_t streamsock;

283
        ICECAST_LOG_INFO("connecting to %s:%d", server, port);
284

285
        streamsock = sock_connect_wto_bind (server, port, _GET_UPSTREAM_SETTING(bind), 10);
286 287
        if (streamsock == SOCK_ERROR)
        {
288
            ICECAST_LOG_WARN("Failed to connect to %s:%d", server, port);
289
            break;
290
        }
291
        con = connection_create(streamsock, NULL, NULL, strdup(server));
292

293 294 295 296 297 298
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
299
                "User-Agent: %s\r\n"
300
                "Host: %s\r\n"
301
                "%s"
302
                "%s"
303
                "\r\n",
304
                mount,
305
                server_id,
306
                server,
307
                _GET_UPSTREAM_SETTING(mp3metadata) ? "Icy-MetaData: 1\r\n" : "",
308
                auth_header);
309
        memset (header, 0, sizeof(header));
310
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
311
        {
312
            ICECAST_LOG_ERROR("Header read failed for %s (%s:%d%s)", relay->config->localmount, server, port, mount);
313 314 315 316
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
317
        if (! httpp_parse_response (parser, header, strlen(header), relay->config->localmount))
318
        {
319
            ICECAST_LOG_ERROR("Error parsing relay request for %s (%s:%d%s)", relay->config->localmount,
320
                    server, port, mount);
321 322
            break;
        }
323
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
324
        {
325 326 327
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
328

329
            uri = httpp_getvar (parser, "location");
330
            ICECAST_LOG_INFO("redirect received %s", uri);
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
350 351 352
            con = NULL;
            parser = NULL;
        }
353 354 355 356 357 358
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
359
                ICECAST_LOG_ERROR("Error from relay request: %s (%s)", relay->config->localmount,
360 361 362 363 364 365 366 367 368 369 370 371 372 373
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
374
            sock_set_blocking (streamsock, 0);
375
            client_set_queue (client, NULL);
376
            client_complete(client);
377 378
            free (server);
            free (mount);
379
            free (server_id);
380 381 382 383 384 385 386 387 388
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
389
    free (server_id);
390 391 392 393 394 395 396 397 398 399 400 401 402 403
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
404
    relay_t *relay = arg;
405 406 407
    source_t *src = relay->source;
    client_t *client;

408
    ICECAST_LOG_INFO("Starting relayed source at mountpoint \"%s\"", relay->config->localmount);
409 410
    do
    {
411 412 413 414 415 416 417 418 419 420 421 422 423 424
        size_t i;

        for (i = 0; i < relay->config->upstreams; i++) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\", trying upstream #%zu", relay->config->localmount, i);
            client = open_relay_connection(relay, &(relay->config->upstream[i]));
            if (client)
                break;
        }

        /* if we have no upstreams defined, use the default upstream */
        if (!relay->config->upstreams) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\" with no upstreams trying upstream default", relay->config->localmount);
            client = open_relay_connection(relay, NULL);
        }
425 426 427 428 429 430 431

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
432 433

        if (connection_complete_source (src, 0) < 0)
434
        {
435
            ICECAST_LOG_INFO("Failed to complete source initialisation");
436 437 438
            client_destroy (client);
            src->client = NULL;
            continue;
439
        }
440
        stats_event_inc(NULL, "source_relay_connections");
441
        stats_event (relay->config->localmount, "source_ip", client->con->ip);
442

443 444
        source_main (relay->source);

445
        if (relay->config->on_demand == 0)
446 447
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
448
            yp_remove (relay->config->localmount);
449
            relay->source->yp_public = -1;
450
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
451
            slave_update_all_mounts();
452 453
        }

454
        /* we've finished, now get cleaned up */
455
        relay->cleanup = 1;
456
        slave_rebuild_mounts();
457 458

        return NULL;
459
    } while (0); /* TODO allow looping through multiple servers */
460

461 462 463 464
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

465
        ICECAST_LOG_DEBUG("failed relay, fallback to %s", relay->source->fallback_mount);
466
        avl_tree_rlock(global.source_tree);
467
        fallback_source = source_find_mount(relay->source->fallback_mount);
468 469

        if (fallback_source != NULL)
470
            source_move_clients(relay->source, fallback_source);
471

472
        avl_tree_unlock(global.source_tree);
473 474
    }

475
    source_clear_source(relay->source);
476

477
    /* cleanup relay, but prevent this relay from starting up again too soon */
478 479
    thread_mutex_lock(&_slave_mutex);
    thread_mutex_lock(&(config_locks()->relay_lock));
Karl Heyes's avatar
Karl Heyes committed
480
    relay->source->on_demand = 0;
481
    relay->start = time(NULL) + max_interval;
482
    relay->cleanup = 1;
483 484
    thread_mutex_unlock(&(config_locks()->relay_lock));
    thread_mutex_unlock(&_slave_mutex);
485 486

    return NULL;
487 488 489 490
}


/* wrapper for starting the provided relay stream */
491
static void check_relay_stream (relay_t *relay)
492 493 494
{
    if (relay->source == NULL)
    {
495
        if (relay->config->localmount[0] != '/')
496
        {
497
            ICECAST_LOG_WARN("relay mountpoint \"%s\" does not start with /, skipping",
498
                    relay->config->localmount);
499 500
            return;
        }
501
        /* new relay, reserve the name */
502
        relay->source = source_reserve (relay->config->localmount);
503
        if (relay->source)
504
        {
505 506
            ICECAST_LOG_DEBUG("Adding relay source at mountpoint \"%s\"", relay->config->localmount);
            if (relay->config->on_demand)
Karl Heyes's avatar
Karl Heyes committed
507 508
            {
                ice_config_t *config = config_get_config ();
509 510
                mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
                relay->source->on_demand = relay->config->on_demand;
Karl Heyes's avatar
Karl Heyes committed
511 512 513
                if (mountinfo == NULL)
                    source_update_settings (config, relay->source, mountinfo);
                config_release_config ();
514
                stats_event (relay->config->localmount, "listeners", "0");
515
                slave_update_all_mounts();
Karl Heyes's avatar
Karl Heyes committed
516
            }
517
        }
518
        else
519 520 521
        {
            if (relay->start == 0)
            {
522
                ICECAST_LOG_WARN("new relay but source \"%s\" already exists", relay->config->localmount);
523 524 525 526
                relay->start = 1;
            }
            return;
        }
527
    }
528
    do
529
    {
530
        source_t *source = relay->source;
531 532
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
533
            break;
534
        /* check if an inactive on-demand relay has a fallback that has listeners */
535
        if (relay->config->on_demand && source->on_demand_req == 0)
536
        {
537
            relay->source->on_demand = relay->config->on_demand;
538 539 540 541 542 543 544 545

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
546
                   ICECAST_LOG_DEBUG("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
547 548 549 550 551 552 553 554
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

555
        relay->start = time(NULL) + 5;
556
        relay->running = 1;
557 558 559
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
560

561
    } while (0);
562
    /* the relay thread may of shut down itself */
563
    if (relay->cleanup)
564
    {
565 566
        if (relay->thread)
        {
567
            ICECAST_LOG_DEBUG("waiting for relay thread for \"%s\"", relay->config->localmount);
568 569 570
            thread_join (relay->thread);
            relay->thread = NULL;
        }
571 572
        relay->cleanup = 0;
        relay->running = 0;
573

574
        if (relay->config->on_demand && relay->source)
575 576
        {
            ice_config_t *config = config_get_config ();
577
            mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
578 579
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
580
            stats_event (relay->config->localmount, "listeners", "0");
581
        }
582
    }
583 584
}

585

586 587 588
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
589 590 591
#define _EQ_STR(a,b) (((a) == (b)) || ((a) != NULL && (b) != NULL && strcmp((a), (b)) == 0))
#define _EQ_ATTR(x) (_EQ_STR((new->x), (old->x)))
static int relay_has_changed_upstream(const relay_config_upstream_t *new, const relay_config_upstream_t *old)
592
{
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634
    if (new->mp3metadata != old->mp3metadata)
        return 1;

    if (!_EQ_ATTR(server) || new->port != old->port)
        return 1;

    if (!_EQ_ATTR(mount))
        return 1;

/* NOTE: We currently do not consider this a relevant change. Why?
    if (!_EQ_ATTR(username) || !_EQ_ATTR(password))
        return 1;

    if (!_EQ_ATTR(bind))
        return 1;
*/

    return 0;
}

static int relay_has_changed (const relay_config_t *new, relay_config_t *old)
{
    size_t i;

    /* This is not fully true: If more upstreams has been added there is no reason
     * to restart the relay. However for now we ignore this case. TODO: Change this.
     */
    if (new->upstreams != old->upstreams)
        return 1;

    for (i = 0; i < new->upstreams; i++) {
        if (relay_has_changed_upstream(&(new->upstream[i]), &(old->upstream[i])))
            return 1;
    }

    if (relay_has_changed_upstream(&(new->upstream_default), &(old->upstream_default)))
        return 1;

    /* Why do we do this here? */
    old->on_demand = new->on_demand;

    return 0;
635 636 637
}


638 639 640 641
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
642 643
static relay_t *
update_relay_set(relay_t **current, relay_config_t **updated, size_t updated_length)
644
{
645 646 647 648 649 650 651
    relay_config_t *relay;
    relay_t *existing_relay, **existing_p;
    relay_t *new_list = NULL;
    size_t i;

    for (i = 0; i < updated_length; i++) {
        relay = updated[i];
652

653 654 655 656 657 658
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
659 660
            if (strcmp(relay->localmount, existing_relay->config->localmount) == 0)
                if (relay_has_changed(relay, existing_relay->config) == 0)
661
                    break;
662
            existing_p = &existing_relay->next;
663

664 665
            existing_relay = existing_relay->next;
        }
666 667


668 669 670
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
671
            existing_relay = relay_new(relay);
672 673 674 675 676 677 678
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
679
    }
680

681 682 683 684 685 686
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
687
 * are separated and returned in a separate list
688
 */
689 690
static relay_t *
update_relays (relay_t **relay_list, relay_config_t **new_relay_list, size_t new_relay_list_length)
691
{
692
    relay_t *active_relays, *cleanup_relays;
693

694
    active_relays = update_relay_set(relay_list, new_relay_list, new_relay_list_length);
695

696 697 698 699 700 701 702 703
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


704 705
static void relay_check_streams (relay_t *to_start,
        relay_t *to_free, int skip_timer)
706
{
707
    relay_t *relay;
708 709

    while (to_free)
710
    {
711
        if (to_free->source)
712
        {
713 714 715
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
716
                ICECAST_LOG_DEBUG("source shutdown request on \"%s\"", to_free->config->localmount);
717
                to_free->running = 0;
718 719 720 721
                to_free->source->running = 0;
                thread_join (to_free->thread);
            }
            else
722
                stats_event (to_free->config->localmount, NULL, NULL);
723
        }
724 725 726 727 728 729
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
730 731
        if (skip_timer)
            relay->start = 0;
732 733
        check_relay_stream (relay);
        relay = relay->next;
734
    }
735 736
}

737 738 739 740 741

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
742
    sock_t mastersock;
743
    int ret = 0;
744
    char buf[256];
745 746 747
    do
    {
        char *authheader, *data;
748 749 750
        relay_t *cleanup_relays;
        relay_config_t **new_relays = NULL;
        size_t new_relays_length = 0;
751
        int len, count = 1;
752
        int on_demand;
753
        size_t i;
754

755
        username = strdup(config->master_username);
756
        if (config->master_password)
757
            password = strdup(config->master_password);
758

759
        if (config->master_server)
760
            master = strdup(config->master_server);
761

762
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
763

764 765
        if (password == NULL || master == NULL || port == 0)
            break;
766
        on_demand = config->on_demand;
767 768
        ret = 1;
        config_release_config();
769
        mastersock = sock_connect_wto(master, port, 10);
770

771 772
        if (mastersock == SOCK_ERROR)
        {
773
            ICECAST_LOG_WARN("Relay slave failed to contact master server to fetch stream list");
774 775
            break;
        }
Michael Smith's avatar
Michael Smith committed
776

777 778 779
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
780
        data = util_base64_encode(authheader, len);
781 782 783 784 785 786 787
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

788
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
789
                ((strncmp (buf, "HTTP/1.0 200", 12) != 0) && (strncmp (buf, "HTTP/1.1 200", 12) != 0)))
790 791
        {
            sock_close (mastersock);
792
            ICECAST_LOG_WARN("Master rejected streamlist request");
793
            break;
794 795
        } else {
            ICECAST_LOG_INFO("Master accepted streamlist request");
796 797
        }

798 799 800 801 802 803 804
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
805 806 807
            relay_config_t *c = NULL;
            relay_config_t **n;

808 809
            if (!strlen(buf))
                continue;
810
            ICECAST_LOG_DEBUG("read %d from master \"%s\"", count++, buf);
811 812
            xmlURIPtr parsed_uri = xmlParseURI(buf);
            if (parsed_uri == NULL) {
813
                ICECAST_LOG_DEBUG("Error while parsing line from master. Ignoring line.");
814 815
                continue;
            }
816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835

            n = realloc(new_relays, sizeof(*new_relays)*(new_relays_length + 1));
            if (n) {
                new_relays = n;

                c = calloc(1, sizeof(*c));
                new_relays[new_relays_length++] = c;
            }

            if (c) {
                if (parsed_uri->server != NULL) {
                    c->upstream_default.server = strdup(parsed_uri->server);
                    if (parsed_uri->port == 0) {
                        c->upstream_default.port = 80;
                    } else {
                        c->upstream_default.port = parsed_uri->port;
                    }
                } else {
                    c->upstream_default.server = (char *)xmlCharStrdup (master);
                    c->upstream_default.port = port;
836 837
                }

838 839 840 841 842
                c->upstream_default.mount = strdup(parsed_uri->path);
                c->localmount = strdup(parsed_uri->path);
                c->upstream_default.mp3metadata = 1;
                c->on_demand = on_demand;
                ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", c->upstream_default.server, c->upstream_default.port, c->upstream_default.mount);
843
            }
844
            xmlFreeURI(parsed_uri);
Michael Smith's avatar
Michael Smith committed
845
        }
846 847
        sock_close (mastersock);

848
        thread_mutex_lock (&(config_locks()->relay_lock));
849
        cleanup_relays = update_relays (&global.master_relays, new_relays, new_relays_length);
850

851
        relay_check_streams (global.master_relays, cleanup_relays, 0);
852 853 854 855 856

        for (i = 0; i < new_relays_length; i++) {
            relay_config_free(new_relays[i]);
        }
        free(new_relays);
857 858 859

        thread_mutex_unlock (&(config_locks()->relay_lock));

860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
876
    unsigned int interval = 0;
877

878 879
    (void)arg;

880
    thread_mutex_lock(&_slave_mutex);
881
    update_settings = 0;
882
    update_all_mounts = 0;
883
    thread_mutex_unlock(&_slave_mutex);
884

885
    config = config_get_config();
886
    stats_global(config);
887
    config_release_config();
888
    source_recheck_mounts(1);
889

890
    while (1)
891
    {
892
        relay_t *cleanup_relays = NULL;
893
        int skip_timer = 0;
894

895
        /* re-read xml file if requested */
896
        global_lock();
897 898
        if (global.schedule_config_reread) {
            config_reread_config();
899
            global.schedule_config_reread = 0;
900
        }
901
        global_unlock();
902

903
        thread_sleep(1000000);
904 905 906
        thread_mutex_lock(&_slave_mutex);
        if (slave_running == 0) {
            thread_mutex_unlock(&_slave_mutex);
907
            break;
908 909
        }
        thread_mutex_unlock(&_slave_mutex);
910 911

        ++interval;
912

913
        /* only update relays lists when required */
914
        thread_mutex_lock(&_slave_mutex);
915 916
        if (max_interval <= interval)
        {
917
            ICECAST_LOG_DEBUG("checking master stream list");
918
            config = config_get_config();
919

920 921
            if (max_interval == 0)
                skip_timer = 1;
922 923
            interval = 0;
            max_interval = config->master_update_interval;
924
            thread_mutex_unlock(&_slave_mutex);
925

926 927 928
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
929

930
            thread_mutex_lock (&(config_locks()->relay_lock));
931

932
            cleanup_relays = update_relays(&global.relays, config->relay, config->relay_length);
933

934 935 936
            config_release_config();
        }
        else
937 938
        {
            thread_mutex_unlock(&_slave_mutex);
939
            thread_mutex_lock (&(config_locks()->relay_lock));
940
        }
941 942 943 944 945

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

946
        thread_mutex_lock(&_slave_mutex);
947 948
        if (update_settings)
        {
949
            source_recheck_mounts (update_all_mounts);
950
            update_settings = 0;
951
            update_all_mounts = 0;
952
        }
953
        thread_mutex_unlock(&_slave_mutex);
954
    }
955
    ICECAST_LOG_INFO("shutting down current relays");
956 957
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
958

959
    ICECAST_LOG_INFO("Slave thread shutdown complete");
960

961
    return NULL;
962
}
963