slave.c 28 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10 11 12
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17 18 19 20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21 22 23 24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25 26 27 28 29 30 31 32 33 34 35 36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#endif

37
#include "compat.h"
38

39
#include <libxml/uri.h>
Marvin Scholz's avatar
Marvin Scholz committed
40 41 42 43
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/net/sock.h"
#include "common/httpp/httpp.h"
44

45
#include "slave.h"
46
#include "cfgfile.h"
47 48 49 50 51 52 53 54
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
55
#include "format.h"
56 57 58

#define CATMODULE "slave"

59 60 61 62 63 64 65 66 67 68
struct relay_tag {
    relay_config_t *config;
    source_t *source;
    int running;
    int cleanup;
    time_t start;
    thread_type *thread;
    relay_t *next;
};

69
static void *_slave_thread(void *arg);
70
static thread_type *_slave_thread_id;
71
static int slave_running = 0;
72
static volatile int update_settings = 0;
73
static volatile int update_all_mounts = 0;
74
static volatile unsigned int max_interval = 0;
75
static mutex_t _slave_mutex; // protects update_settings, update_all_mounts, max_interval
76

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
static inline void relay_config_upstream_free (relay_config_upstream_t *upstream)
{
    if (upstream->server)
        xmlFree(upstream->server);
    if (upstream->mount)
        xmlFree(upstream->mount);
    if (upstream->username)
        xmlFree(upstream->username);
    if (upstream->password)
        xmlFree(upstream->password);
}

void relay_config_free (relay_config_t *relay)
{
    size_t i;

    ICECAST_LOG_DEBUG("freeing relay config for %s", relay->localmount);

    for (i = 0; i < relay->upstreams; i++) {
        relay_config_upstream_free(&(relay->upstream[i]));
    }

    relay_config_upstream_free(&(relay->upstream_default));

    xmlFree(relay->localmount);
    free(relay->upstream);
    free(relay);
}

relay_t *relay_free (relay_t *relay)
107
{
108 109 110 111
    relay_t *next = relay->next;

    ICECAST_LOG_DEBUG("freeing relay %s", relay->config->localmount);

112 113
    if (relay->source)
       source_free_source (relay->source);
114 115 116 117

    relay_config_free(relay->config);

    free(relay);
118
    return next;
119 120
}

121

122
static inline void relay_config_upstream_copy(relay_config_upstream_t *dst, const relay_config_upstream_t *src)
123
{
124 125
    dst->server = (char *)xmlCharStrdup(src->server);
    dst->mount = (char *)xmlCharStrdup(src->mount);
126

127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
    if (src->username)
        dst->username = (char *)xmlCharStrdup(src->username);
    if (src->password)
        dst->password = (char *)xmlCharStrdup(src->password);

    dst->port = src->port;

    dst->mp3metadata = src->mp3metadata;
}

static inline relay_config_t *relay_config_copy (relay_config_t *r)
{
    relay_config_t *copy = calloc (1, sizeof (relay_config_t));
    relay_config_upstream_t *u = NULL;
    size_t i;

    if (r->upstreams) {
        u = calloc(r->upstreams, sizeof(relay_config_upstream_t));
        if (!u) {
            free(copy);
            return NULL;
        }
    }

    if (!copy) {
        free(u);
        return NULL;
154
    }
155 156

    copy->upstream = u;
157
    copy->upstreams = r->upstreams;
158 159 160 161 162 163 164

    copy->localmount = (char *)xmlCharStrdup(r->localmount);
    copy->on_demand = r->on_demand;

    relay_config_upstream_copy(&(copy->upstream_default), &(r->upstream_default));

    for (i = 0; i < r->upstreams; i++)
Philipp Schafft's avatar
Philipp Schafft committed
165
        relay_config_upstream_copy(&(copy->upstream[i]), &(r->upstream[i]));
166 167


168 169
    return copy;
}
170

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
static inline relay_t *relay_new(relay_config_t *config)
{
    relay_t *r = calloc(1, sizeof(*r));

    if (!r)
        return NULL;

    r->config = relay_config_copy(config);
    if (!r->config) {
        free(r);
        return NULL;
    }

    return r;
}
186

187
/* force a recheck of the relays. This will recheck the master server if
188
 * this is a slave and rebuild all mountpoints in the stats tree
189
 */
190
void slave_update_all_mounts(void)
191
{
192
    thread_mutex_lock(&_slave_mutex);
193
    max_interval = 0;
194
    update_all_mounts = 1;
195
    update_settings = 1;
196
    thread_mutex_unlock(&_slave_mutex);
197 198
}

199 200 201

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
202
 */
203
void slave_rebuild_mounts(void)
204
{
205
    thread_mutex_lock(&_slave_mutex);
206
    update_settings = 1;
207
    thread_mutex_unlock(&_slave_mutex);
208 209
}

210 211

void slave_initialize(void)
212
{
213 214
    if (slave_running)
        return;
215

216
    slave_running = 1;
217
    max_interval = 0;
218
    thread_mutex_create (&_slave_mutex);
219 220
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
221

222

223 224 225
void slave_shutdown(void)
{
    if (!slave_running)
226
        return;
227
    slave_running = 0;
228
    ICECAST_LOG_DEBUG("waiting for slave thread");
229 230 231 232
    thread_join (_slave_thread_id);
}


233 234
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
235
 */
236 237
#define _GET_UPSTREAM_SETTING(n) ((upstream && upstream->n) ? upstream->n : relay->config->upstream_default.n)
static client_t *open_relay_connection (relay_t *relay, relay_config_upstream_t *upstream)
238
{
239
    int redirects = 0;
240 241
    char *server_id = NULL;
    ice_config_t *config;
242 243
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
244 245 246
    char *server = strdup (_GET_UPSTREAM_SETTING(server));
    char *mount = strdup (_GET_UPSTREAM_SETTING(mount));
    int port = _GET_UPSTREAM_SETTING(port);
247
    char *auth_header;
248 249
    char header[4096];

250 251 252 253
    config = config_get_config ();
    server_id = strdup (config->server_id);
    config_release_config ();

254
    /* build any authentication header before connecting */
255
    if (_GET_UPSTREAM_SETTING(username) && _GET_UPSTREAM_SETTING(password))
256
    {
257
        char *esc_authorisation;
258
        unsigned len = strlen(_GET_UPSTREAM_SETTING(username)) + strlen(_GET_UPSTREAM_SETTING(password)) + 2;
259 260

        auth_header = malloc (len);
261
        snprintf (auth_header, len, "%s:%s", _GET_UPSTREAM_SETTING(username), _GET_UPSTREAM_SETTING(password));
262
        esc_authorisation = util_base64_encode(auth_header, len);
263 264 265 266 267 268 269 270 271
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
272

273 274 275 276
    while (redirects < 10)
    {
        sock_t streamsock;

277
        ICECAST_LOG_INFO("connecting to %s:%d", server, port);
278

279
        streamsock = sock_connect_wto_bind (server, port, _GET_UPSTREAM_SETTING(bind), 10);
280 281
        if (streamsock == SOCK_ERROR)
        {
282
            ICECAST_LOG_WARN("Failed to connect to %s:%d", server, port);
283
            break;
284
        }
285
        con = connection_create(streamsock, NULL, NULL, strdup(server));
286

287 288 289 290 291 292
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
293
                "User-Agent: %s\r\n"
294
                "Host: %s\r\n"
295
                "%s"
296
                "%s"
297
                "\r\n",
298
                mount,
299
                server_id,
300
                server,
301
                _GET_UPSTREAM_SETTING(mp3metadata) ? "Icy-MetaData: 1\r\n" : "",
302
                auth_header);
303
        memset (header, 0, sizeof(header));
304
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
305
        {
306
            ICECAST_LOG_ERROR("Header read failed for %s (%s:%d%s)", relay->config->localmount, server, port, mount);
307 308 309 310
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
311
        if (! httpp_parse_response (parser, header, strlen(header), relay->config->localmount))
312
        {
313
            ICECAST_LOG_ERROR("Error parsing relay request for %s (%s:%d%s)", relay->config->localmount,
314
                    server, port, mount);
315 316
            break;
        }
317
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
318
        {
319 320 321
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
322

323
            uri = httpp_getvar (parser, "location");
324
            ICECAST_LOG_INFO("redirect received %s", uri);
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
344 345 346
            con = NULL;
            parser = NULL;
        }
347 348 349 350 351 352
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
353
                ICECAST_LOG_ERROR("Error from relay request: %s (%s)", relay->config->localmount,
354 355 356 357 358 359 360 361 362 363 364 365 366 367
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
368
            sock_set_blocking (streamsock, 0);
369
            client_set_queue (client, NULL);
370
            client_complete(client);
371 372
            free (server);
            free (mount);
373
            free (server_id);
374 375 376 377 378 379 380 381 382
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
383
    free (server_id);
384 385 386 387 388 389 390 391 392 393 394 395 396 397
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
398
    relay_t *relay = arg;
399 400 401
    source_t *src = relay->source;
    client_t *client;

402
    ICECAST_LOG_INFO("Starting relayed source at mountpoint \"%s\"", relay->config->localmount);
403 404
    do
    {
405 406 407 408 409 410 411 412 413 414 415 416 417 418
        size_t i;

        for (i = 0; i < relay->config->upstreams; i++) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\", trying upstream #%zu", relay->config->localmount, i);
            client = open_relay_connection(relay, &(relay->config->upstream[i]));
            if (client)
                break;
        }

        /* if we have no upstreams defined, use the default upstream */
        if (!relay->config->upstreams) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\" with no upstreams trying upstream default", relay->config->localmount);
            client = open_relay_connection(relay, NULL);
        }
419 420 421 422 423 424 425

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
426 427

        if (connection_complete_source (src, 0) < 0)
428
        {
429
            ICECAST_LOG_INFO("Failed to complete source initialisation");
430 431 432
            client_destroy (client);
            src->client = NULL;
            continue;
433
        }
434
        stats_event_inc(NULL, "source_relay_connections");
435
        stats_event (relay->config->localmount, "source_ip", client->con->ip);
436

437 438
        source_main (relay->source);

439
        if (relay->config->on_demand == 0)
440 441
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
442
            yp_remove (relay->config->localmount);
443
            relay->source->yp_public = -1;
444
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
445
            slave_update_all_mounts();
446 447
        }

448
        /* we've finished, now get cleaned up */
449
        relay->cleanup = 1;
450
        slave_rebuild_mounts();
451 452

        return NULL;
453
    } while (0); /* TODO allow looping through multiple servers */
454

455 456 457 458
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

459
        ICECAST_LOG_DEBUG("failed relay, fallback to %s", relay->source->fallback_mount);
460
        avl_tree_rlock(global.source_tree);
461
        fallback_source = source_find_mount(relay->source->fallback_mount);
462 463

        if (fallback_source != NULL)
464
            source_move_clients(relay->source, fallback_source);
465

466
        avl_tree_unlock(global.source_tree);
467 468
    }

469
    source_clear_source(relay->source);
470

471
    /* cleanup relay, but prevent this relay from starting up again too soon */
472 473
    thread_mutex_lock(&_slave_mutex);
    thread_mutex_lock(&(config_locks()->relay_lock));
Karl Heyes's avatar
Karl Heyes committed
474
    relay->source->on_demand = 0;
475
    relay->start = time(NULL) + max_interval;
476
    relay->cleanup = 1;
477 478
    thread_mutex_unlock(&(config_locks()->relay_lock));
    thread_mutex_unlock(&_slave_mutex);
479 480

    return NULL;
481 482 483 484
}


/* wrapper for starting the provided relay stream */
485
static void check_relay_stream (relay_t *relay)
486 487 488
{
    if (relay->source == NULL)
    {
489
        if (relay->config->localmount[0] != '/')
490
        {
491
            ICECAST_LOG_WARN("relay mountpoint \"%s\" does not start with /, skipping",
492
                    relay->config->localmount);
493 494
            return;
        }
495
        /* new relay, reserve the name */
496
        relay->source = source_reserve (relay->config->localmount);
497
        if (relay->source)
498
        {
499 500
            ICECAST_LOG_DEBUG("Adding relay source at mountpoint \"%s\"", relay->config->localmount);
            if (relay->config->on_demand)
Karl Heyes's avatar
Karl Heyes committed
501 502
            {
                ice_config_t *config = config_get_config ();
503 504
                mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
                relay->source->on_demand = relay->config->on_demand;
Karl Heyes's avatar
Karl Heyes committed
505 506 507
                if (mountinfo == NULL)
                    source_update_settings (config, relay->source, mountinfo);
                config_release_config ();
508
                stats_event (relay->config->localmount, "listeners", "0");
509
                slave_update_all_mounts();
Karl Heyes's avatar
Karl Heyes committed
510
            }
511
        }
512
        else
513 514 515
        {
            if (relay->start == 0)
            {
516
                ICECAST_LOG_WARN("new relay but source \"%s\" already exists", relay->config->localmount);
517 518 519 520
                relay->start = 1;
            }
            return;
        }
521
    }
522
    do
523
    {
524
        source_t *source = relay->source;
525 526
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
527
            break;
528
        /* check if an inactive on-demand relay has a fallback that has listeners */
529
        if (relay->config->on_demand && source->on_demand_req == 0)
530
        {
531
            relay->source->on_demand = relay->config->on_demand;
532 533 534 535 536 537 538 539

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
540
                   ICECAST_LOG_DEBUG("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
541 542 543 544 545 546 547 548
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

549
        relay->start = time(NULL) + 5;
550
        relay->running = 1;
551 552 553
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
554

555
    } while (0);
556
    /* the relay thread may of shut down itself */
557
    if (relay->cleanup)
558
    {
559 560
        if (relay->thread)
        {
561
            ICECAST_LOG_DEBUG("waiting for relay thread for \"%s\"", relay->config->localmount);
562 563 564
            thread_join (relay->thread);
            relay->thread = NULL;
        }
565 566
        relay->cleanup = 0;
        relay->running = 0;
567

568
        if (relay->config->on_demand && relay->source)
569 570
        {
            ice_config_t *config = config_get_config ();
571
            mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
572 573
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
574
            stats_event (relay->config->localmount, "listeners", "0");
575
        }
576
    }
577 578
}

579

580 581 582
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
583 584 585
#define _EQ_STR(a,b) (((a) == (b)) || ((a) != NULL && (b) != NULL && strcmp((a), (b)) == 0))
#define _EQ_ATTR(x) (_EQ_STR((new->x), (old->x)))
static int relay_has_changed_upstream(const relay_config_upstream_t *new, const relay_config_upstream_t *old)
586
{
587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
    if (new->mp3metadata != old->mp3metadata)
        return 1;

    if (!_EQ_ATTR(server) || new->port != old->port)
        return 1;

    if (!_EQ_ATTR(mount))
        return 1;

/* NOTE: We currently do not consider this a relevant change. Why?
    if (!_EQ_ATTR(username) || !_EQ_ATTR(password))
        return 1;

    if (!_EQ_ATTR(bind))
        return 1;
*/

    return 0;
}

static int relay_has_changed (const relay_config_t *new, relay_config_t *old)
{
    size_t i;

    /* This is not fully true: If more upstreams has been added there is no reason
     * to restart the relay. However for now we ignore this case. TODO: Change this.
     */
    if (new->upstreams != old->upstreams)
        return 1;

    for (i = 0; i < new->upstreams; i++) {
        if (relay_has_changed_upstream(&(new->upstream[i]), &(old->upstream[i])))
            return 1;
    }

    if (relay_has_changed_upstream(&(new->upstream_default), &(old->upstream_default)))
        return 1;

    /* Why do we do this here? */
    old->on_demand = new->on_demand;

    return 0;
629 630 631
}


632 633 634 635
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
636 637
static relay_t *
update_relay_set(relay_t **current, relay_config_t **updated, size_t updated_length)
638
{
639 640 641 642 643 644 645
    relay_config_t *relay;
    relay_t *existing_relay, **existing_p;
    relay_t *new_list = NULL;
    size_t i;

    for (i = 0; i < updated_length; i++) {
        relay = updated[i];
646

647 648 649 650 651 652
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
653 654
            if (strcmp(relay->localmount, existing_relay->config->localmount) == 0)
                if (relay_has_changed(relay, existing_relay->config) == 0)
655
                    break;
656
            existing_p = &existing_relay->next;
657

658 659
            existing_relay = existing_relay->next;
        }
660 661


662 663 664
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
665
            existing_relay = relay_new(relay);
666 667 668 669 670 671 672
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
673
    }
674

675 676 677 678 679 680
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
681
 * are separated and returned in a separate list
682
 */
683 684
static relay_t *
update_relays (relay_t **relay_list, relay_config_t **new_relay_list, size_t new_relay_list_length)
685
{
686
    relay_t *active_relays, *cleanup_relays;
687

688
    active_relays = update_relay_set(relay_list, new_relay_list, new_relay_list_length);
689

690 691 692 693 694 695 696 697
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


698 699
static void relay_check_streams (relay_t *to_start,
        relay_t *to_free, int skip_timer)
700
{
701
    relay_t *relay;
702 703

    while (to_free)
704
    {
705
        if (to_free->source)
706
        {
707 708 709
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
710
                ICECAST_LOG_DEBUG("source shutdown request on \"%s\"", to_free->config->localmount);
711
                to_free->running = 0;
712 713 714 715
                to_free->source->running = 0;
                thread_join (to_free->thread);
            }
            else
716
                stats_event (to_free->config->localmount, NULL, NULL);
717
        }
718 719 720 721 722 723
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
724 725
        if (skip_timer)
            relay->start = 0;
726 727
        check_relay_stream (relay);
        relay = relay->next;
728
    }
729 730
}

731 732 733 734 735

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
736
    sock_t mastersock;
737
    int ret = 0;
738
    char buf[256];
739 740 741
    do
    {
        char *authheader, *data;
742 743 744
        relay_t *cleanup_relays;
        relay_config_t **new_relays = NULL;
        size_t new_relays_length = 0;
745
        int len, count = 1;
746
        int on_demand;
747
        size_t i;
748

749
        username = strdup(config->master_username);
750
        if (config->master_password)
751
            password = strdup(config->master_password);
752

753
        if (config->master_server)
754
            master = strdup(config->master_server);
755

756
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
757

758 759
        if (password == NULL || master == NULL || port == 0)
            break;
760
        on_demand = config->on_demand;
761 762
        ret = 1;
        config_release_config();
763
        mastersock = sock_connect_wto(master, port, 10);
764

765 766
        if (mastersock == SOCK_ERROR)
        {
767
            ICECAST_LOG_WARN("Relay slave failed to contact master server to fetch stream list");
768 769
            break;
        }
Michael Smith's avatar
Michael Smith committed
770

771 772 773
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
774
        data = util_base64_encode(authheader, len);
775 776 777 778 779 780 781
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

782
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
783
                ((strncmp (buf, "HTTP/1.0 200", 12) != 0) && (strncmp (buf, "HTTP/1.1 200", 12) != 0)))
784 785
        {
            sock_close (mastersock);
786
            ICECAST_LOG_WARN("Master rejected streamlist request");
787
            break;
788 789
        } else {
            ICECAST_LOG_INFO("Master accepted streamlist request");
790 791
        }

792 793 794 795 796 797 798
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
799 800 801
            relay_config_t *c = NULL;
            relay_config_t **n;

802 803
            if (!strlen(buf))
                continue;
804
            ICECAST_LOG_DEBUG("read %d from master \"%s\"", count++, buf);
805 806
            xmlURIPtr parsed_uri = xmlParseURI(buf);
            if (parsed_uri == NULL) {
807
                ICECAST_LOG_DEBUG("Error while parsing line from master. Ignoring line.");
808 809
                continue;
            }
810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829

            n = realloc(new_relays, sizeof(*new_relays)*(new_relays_length + 1));
            if (n) {
                new_relays = n;

                c = calloc(1, sizeof(*c));
                new_relays[new_relays_length++] = c;
            }

            if (c) {
                if (parsed_uri->server != NULL) {
                    c->upstream_default.server = strdup(parsed_uri->server);
                    if (parsed_uri->port == 0) {
                        c->upstream_default.port = 80;
                    } else {
                        c->upstream_default.port = parsed_uri->port;
                    }
                } else {
                    c->upstream_default.server = (char *)xmlCharStrdup (master);
                    c->upstream_default.port = port;
830 831
                }

832 833 834 835 836
                c->upstream_default.mount = strdup(parsed_uri->path);
                c->localmount = strdup(parsed_uri->path);
                c->upstream_default.mp3metadata = 1;
                c->on_demand = on_demand;
                ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", c->upstream_default.server, c->upstream_default.port, c->upstream_default.mount);
837
            }
838
            xmlFreeURI(parsed_uri);
Michael Smith's avatar
Michael Smith committed
839
        }
840 841
        sock_close (mastersock);

842
        thread_mutex_lock (&(config_locks()->relay_lock));
843
        cleanup_relays = update_relays (&global.master_relays, new_relays, new_relays_length);
844

845
        relay_check_streams (global.master_relays, cleanup_relays, 0);
846 847 848 849 850

        for (i = 0; i < new_relays_length; i++) {
            relay_config_free(new_relays[i]);
        }
        free(new_relays);
851 852 853

        thread_mutex_unlock (&(config_locks()->relay_lock));

854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
870
    unsigned int interval = 0;
871

872 873
    (void)arg;

874
    thread_mutex_lock(&_slave_mutex);
875
    update_settings = 0;
876
    update_all_mounts = 0;
877
    thread_mutex_unlock(&_slave_mutex);
878

879
    config = config_get_config();
880
    stats_global(config);
881
    config_release_config();
882
    source_recheck_mounts(1);
883

884
    while (1)
885
    {
886
        relay_t *cleanup_relays = NULL;
887
        int skip_timer = 0;
888

889
        /* re-read xml file if requested */
890
        global_lock();
891 892
        if (global.schedule_config_reread) {
            config_reread_config();
893
            global.schedule_config_reread = 0;
894
        }
895
        global_unlock();
896

897
        thread_sleep(1000000);
898 899
        if (slave_running == 0)
            break;
900 901

        ++interval;
902

903
        /* only update relays lists when required */
904
        thread_mutex_lock(&_slave_mutex);
905 906
        if (max_interval <= interval)
        {
907
            ICECAST_LOG_DEBUG("checking master stream list");
908
            config = config_get_config();
909

910 911
            if (max_interval == 0)
                skip_timer = 1;
912 913
            interval = 0;
            max_interval = config->master_update_interval;
914
            thread_mutex_unlock(&_slave_mutex);
915

916 917 918
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
919

920
            thread_mutex_lock (&(config_locks()->relay_lock));
921

922
            cleanup_relays = update_relays(&global.relays, config->relay, config->relay_length);
923

924 925 926
            config_release_config();
        }
        else
927 928
        {
            thread_mutex_unlock(&_slave_mutex);
929
            thread_mutex_lock (&(config_locks()->relay_lock));
930
        }
931 932 933 934 935

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

936
        thread_mutex_lock(&_slave_mutex);
937 938
        if (update_settings)
        {
939
            source_recheck_mounts (update_all_mounts);
940
            update_settings = 0;
941
            update_all_mounts = 0;
942
        }
943
        thread_mutex_unlock(&_slave_mutex);
944
    }
945
    ICECAST_LOG_INFO("shutting down current relays");
946 947
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
948

949
    ICECAST_LOG_INFO("Slave thread shutdown complete");
950

951
    return NULL;
952
}
953