slave.c 28.3 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10 11 12
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17 18 19 20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21 22 23 24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25 26 27 28 29 30 31 32 33 34 35 36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#endif

37
#include "compat.h"
38

39
#include <libxml/uri.h>
40 41 42 43
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/net/sock.h"
#include "common/httpp/httpp.h"
44

45
#include "slave.h"
46
#include "cfgfile.h"
47 48 49 50 51 52 53 54
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
55
#include "format.h"
56 57 58

#define CATMODULE "slave"

59 60 61 62 63 64 65 66 67 68
struct relay_tag {
    relay_config_t *config;
    source_t *source;
    int running;
    int cleanup;
    time_t start;
    thread_type *thread;
    relay_t *next;
};

69
static void *_slave_thread(void *arg);
70
static thread_type *_slave_thread_id;
71
static int slave_running = 0;
72
static volatile int update_settings = 0;
73
static volatile int update_all_mounts = 0;
74
static volatile unsigned int max_interval = 0;
75
static mutex_t _slave_mutex; // protects slave_running, update_settings, update_all_mounts, max_interval
76

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
static inline void relay_config_upstream_free (relay_config_upstream_t *upstream)
{
    if (upstream->server)
        xmlFree(upstream->server);
    if (upstream->mount)
        xmlFree(upstream->mount);
    if (upstream->username)
        xmlFree(upstream->username);
    if (upstream->password)
        xmlFree(upstream->password);
}

void relay_config_free (relay_config_t *relay)
{
    size_t i;

    ICECAST_LOG_DEBUG("freeing relay config for %s", relay->localmount);

    for (i = 0; i < relay->upstreams; i++) {
        relay_config_upstream_free(&(relay->upstream[i]));
    }

    relay_config_upstream_free(&(relay->upstream_default));

    xmlFree(relay->localmount);
    free(relay->upstream);
    free(relay);
}

relay_t *relay_free (relay_t *relay)
107
{
108 109 110 111
    relay_t *next = relay->next;

    ICECAST_LOG_DEBUG("freeing relay %s", relay->config->localmount);

112 113
    if (relay->source)
       source_free_source (relay->source);
114 115 116 117

    relay_config_free(relay->config);

    free(relay);
118
    return next;
119 120
}

121

122
static inline void relay_config_upstream_copy(relay_config_upstream_t *dst, const relay_config_upstream_t *src)
123
{
124 125
    dst->server = (char *)xmlCharStrdup(src->server);
    dst->mount = (char *)xmlCharStrdup(src->mount);
126

127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
    if (src->username)
        dst->username = (char *)xmlCharStrdup(src->username);
    if (src->password)
        dst->password = (char *)xmlCharStrdup(src->password);

    dst->port = src->port;

    dst->mp3metadata = src->mp3metadata;
}

static inline relay_config_t *relay_config_copy (relay_config_t *r)
{
    relay_config_t *copy = calloc (1, sizeof (relay_config_t));
    relay_config_upstream_t *u = NULL;
    size_t i;

    if (r->upstreams) {
        u = calloc(r->upstreams, sizeof(relay_config_upstream_t));
        if (!u) {
            free(copy);
            return NULL;
        }
    }

    if (!copy) {
        free(u);
        return NULL;
154
    }
155 156

    copy->upstream = u;
157
    copy->upstreams = r->upstreams;
158 159 160 161 162 163 164

    copy->localmount = (char *)xmlCharStrdup(r->localmount);
    copy->on_demand = r->on_demand;

    relay_config_upstream_copy(&(copy->upstream_default), &(r->upstream_default));

    for (i = 0; i < r->upstreams; i++)
165
        relay_config_upstream_copy(&(copy->upstream[i]), &(r->upstream[i]));
166 167


168 169
    return copy;
}
170

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
static inline relay_t *relay_new(relay_config_t *config)
{
    relay_t *r = calloc(1, sizeof(*r));

    if (!r)
        return NULL;

    r->config = relay_config_copy(config);
    if (!r->config) {
        free(r);
        return NULL;
    }

    return r;
}
186

187
/* force a recheck of the relays. This will recheck the master server if
188
 * this is a slave and rebuild all mountpoints in the stats tree
189
 */
190
void slave_update_all_mounts(void)
191
{
192
    thread_mutex_lock(&_slave_mutex);
193
    max_interval = 0;
194
    update_all_mounts = 1;
195
    update_settings = 1;
196
    thread_mutex_unlock(&_slave_mutex);
197 198
}

199 200 201

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
202
 */
203
void slave_rebuild_mounts(void)
204
{
205
    thread_mutex_lock(&_slave_mutex);
206
    update_settings = 1;
207
    thread_mutex_unlock(&_slave_mutex);
208 209
}

210 211

void slave_initialize(void)
212
{
213 214
    if (slave_running)
        return;
215

216
    slave_running = 1;
217
    max_interval = 0;
218
    thread_mutex_create (&_slave_mutex);
219 220
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
221

222

223 224
void slave_shutdown(void)
{
225 226 227
    thread_mutex_lock(&_slave_mutex);
    if (!slave_running) {
        thread_mutex_unlock(&_slave_mutex);
228
        return;
229
    }
230
    slave_running = 0;
231 232
    thread_mutex_unlock(&_slave_mutex);

233
    ICECAST_LOG_DEBUG("waiting for slave thread");
234 235 236 237
    thread_join (_slave_thread_id);
}


238 239
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
240
 */
241 242
#define _GET_UPSTREAM_SETTING(n) ((upstream && upstream->n) ? upstream->n : relay->config->upstream_default.n)
static client_t *open_relay_connection (relay_t *relay, relay_config_upstream_t *upstream)
243
{
244
    int redirects = 0;
245 246
    char *server_id = NULL;
    ice_config_t *config;
247 248
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
249 250 251
    char *server = strdup (_GET_UPSTREAM_SETTING(server));
    char *mount = strdup (_GET_UPSTREAM_SETTING(mount));
    int port = _GET_UPSTREAM_SETTING(port);
252
    char *auth_header;
253 254
    char header[4096];

255 256 257 258
    config = config_get_config ();
    server_id = strdup (config->server_id);
    config_release_config ();

259
    /* build any authentication header before connecting */
260
    if (_GET_UPSTREAM_SETTING(username) && _GET_UPSTREAM_SETTING(password))
261
    {
262
        char *esc_authorisation;
263
        unsigned len = strlen(_GET_UPSTREAM_SETTING(username)) + strlen(_GET_UPSTREAM_SETTING(password)) + 2;
264 265

        auth_header = malloc (len);
266
        snprintf (auth_header, len, "%s:%s", _GET_UPSTREAM_SETTING(username), _GET_UPSTREAM_SETTING(password));
267
        esc_authorisation = util_base64_encode(auth_header, len);
268 269 270 271 272 273 274 275 276
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
277

278 279 280 281
    while (redirects < 10)
    {
        sock_t streamsock;

282
        ICECAST_LOG_INFO("connecting to %s:%d", server, port);
283

284
        streamsock = sock_connect_wto_bind (server, port, _GET_UPSTREAM_SETTING(bind), 10);
285 286
        if (streamsock == SOCK_ERROR)
        {
287
            ICECAST_LOG_WARN("Failed to connect to %s:%d", server, port);
288
            break;
289
        }
290
        con = connection_create(streamsock, NULL, NULL, strdup(server));
291

292 293 294 295 296 297
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
298
                "User-Agent: %s\r\n"
299
                "Host: %s\r\n"
300
                "%s"
301
                "%s"
302
                "\r\n",
303
                mount,
304
                server_id,
305
                server,
306
                _GET_UPSTREAM_SETTING(mp3metadata) ? "Icy-MetaData: 1\r\n" : "",
307
                auth_header);
308
        memset (header, 0, sizeof(header));
309
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
310
        {
311
            ICECAST_LOG_ERROR("Header read failed for %s (%s:%d%s)", relay->config->localmount, server, port, mount);
312 313 314 315
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
316
        if (! httpp_parse_response (parser, header, strlen(header), relay->config->localmount))
317
        {
318
            ICECAST_LOG_ERROR("Error parsing relay request for %s (%s:%d%s)", relay->config->localmount,
319
                    server, port, mount);
320 321
            break;
        }
322
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
323
        {
324 325 326
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
327

328
            uri = httpp_getvar (parser, "location");
329
            ICECAST_LOG_INFO("redirect received %s", uri);
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
349 350 351
            con = NULL;
            parser = NULL;
        }
352 353 354 355 356 357
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
358
                ICECAST_LOG_ERROR("Error from relay request: %s (%s)", relay->config->localmount,
359 360 361 362 363 364 365 366 367 368 369 370 371 372
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
373
            sock_set_blocking (streamsock, 0);
374
            client_set_queue (client, NULL);
375
            client_complete(client);
376 377
            free (server);
            free (mount);
378
            free (server_id);
379 380 381 382 383 384 385 386 387
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
388
    free (server_id);
389 390 391 392 393 394 395 396 397 398 399 400 401 402
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
403
    relay_t *relay = arg;
404 405 406
    source_t *src = relay->source;
    client_t *client;

407
    ICECAST_LOG_INFO("Starting relayed source at mountpoint \"%s\"", relay->config->localmount);
408 409
    do
    {
410 411 412 413 414 415 416 417 418 419 420 421 422 423
        size_t i;

        for (i = 0; i < relay->config->upstreams; i++) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\", trying upstream #%zu", relay->config->localmount, i);
            client = open_relay_connection(relay, &(relay->config->upstream[i]));
            if (client)
                break;
        }

        /* if we have no upstreams defined, use the default upstream */
        if (!relay->config->upstreams) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\" with no upstreams trying upstream default", relay->config->localmount);
            client = open_relay_connection(relay, NULL);
        }
424 425 426 427 428 429 430

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
431 432

        if (connection_complete_source (src, 0) < 0)
433
        {
434
            ICECAST_LOG_INFO("Failed to complete source initialisation");
435 436 437
            client_destroy (client);
            src->client = NULL;
            continue;
438
        }
439
        stats_event_inc(NULL, "source_relay_connections");
440
        stats_event (relay->config->localmount, "source_ip", client->con->ip);
441

442 443
        source_main (relay->source);

444
        if (relay->config->on_demand == 0)
445 446
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
447
            yp_remove (relay->config->localmount);
448
            relay->source->yp_public = -1;
449
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
450
            slave_update_all_mounts();
451 452
        }

453
        /* we've finished, now get cleaned up */
454
        relay->cleanup = 1;
455
        slave_rebuild_mounts();
456 457

        return NULL;
458
    } while (0); /* TODO allow looping through multiple servers */
459

460 461 462 463
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

464
        ICECAST_LOG_DEBUG("failed relay, fallback to %s", relay->source->fallback_mount);
465
        avl_tree_rlock(global.source_tree);
466
        fallback_source = source_find_mount(relay->source->fallback_mount);
467 468

        if (fallback_source != NULL)
469
            source_move_clients(relay->source, fallback_source);
470

471
        avl_tree_unlock(global.source_tree);
472 473
    }

474
    source_clear_source(relay->source);
475

476
    /* cleanup relay, but prevent this relay from starting up again too soon */
477 478
    thread_mutex_lock(&_slave_mutex);
    thread_mutex_lock(&(config_locks()->relay_lock));
Karl Heyes's avatar
Karl Heyes committed
479
    relay->source->on_demand = 0;
480
    relay->start = time(NULL) + max_interval;
481
    relay->cleanup = 1;
482 483
    thread_mutex_unlock(&(config_locks()->relay_lock));
    thread_mutex_unlock(&_slave_mutex);
484 485

    return NULL;
486 487 488 489
}


/* wrapper for starting the provided relay stream */
490
static void check_relay_stream (relay_t *relay)
491 492 493
{
    if (relay->source == NULL)
    {
494
        if (relay->config->localmount[0] != '/')
495
        {
496
            ICECAST_LOG_WARN("relay mountpoint \"%s\" does not start with /, skipping",
497
                    relay->config->localmount);
498 499
            return;
        }
500
        /* new relay, reserve the name */
501
        relay->source = source_reserve (relay->config->localmount);
502
        if (relay->source)
503
        {
504 505
            ICECAST_LOG_DEBUG("Adding relay source at mountpoint \"%s\"", relay->config->localmount);
            if (relay->config->on_demand)
Karl Heyes's avatar
Karl Heyes committed
506 507
            {
                ice_config_t *config = config_get_config ();
508 509
                mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
                relay->source->on_demand = relay->config->on_demand;
Karl Heyes's avatar
Karl Heyes committed
510 511 512
                if (mountinfo == NULL)
                    source_update_settings (config, relay->source, mountinfo);
                config_release_config ();
513
                stats_event (relay->config->localmount, "listeners", "0");
514
                slave_update_all_mounts();
Karl Heyes's avatar
Karl Heyes committed
515
            }
516
        }
517
        else
518 519 520
        {
            if (relay->start == 0)
            {
521
                ICECAST_LOG_WARN("new relay but source \"%s\" already exists", relay->config->localmount);
522 523 524 525
                relay->start = 1;
            }
            return;
        }
526
    }
527
    do
528
    {
529
        source_t *source = relay->source;
530 531
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
532
            break;
533
        /* check if an inactive on-demand relay has a fallback that has listeners */
534
        if (relay->config->on_demand && source->on_demand_req == 0)
535
        {
536
            relay->source->on_demand = relay->config->on_demand;
537 538 539 540 541 542 543 544

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
545
                   ICECAST_LOG_DEBUG("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
546 547 548 549 550 551 552 553
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

554
        relay->start = time(NULL) + 5;
555
        relay->running = 1;
556 557 558
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
559

560
    } while (0);
561
    /* the relay thread may of shut down itself */
562
    if (relay->cleanup)
563
    {
564 565
        if (relay->thread)
        {
566
            ICECAST_LOG_DEBUG("waiting for relay thread for \"%s\"", relay->config->localmount);
567 568 569
            thread_join (relay->thread);
            relay->thread = NULL;
        }
570 571
        relay->cleanup = 0;
        relay->running = 0;
572

573
        if (relay->config->on_demand && relay->source)
574 575
        {
            ice_config_t *config = config_get_config ();
576
            mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
577 578
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
579
            stats_event (relay->config->localmount, "listeners", "0");
580
        }
581
    }
582 583
}

584

585 586 587
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
588 589 590
#define _EQ_STR(a,b) (((a) == (b)) || ((a) != NULL && (b) != NULL && strcmp((a), (b)) == 0))
#define _EQ_ATTR(x) (_EQ_STR((new->x), (old->x)))
static int relay_has_changed_upstream(const relay_config_upstream_t *new, const relay_config_upstream_t *old)
591
{
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
    if (new->mp3metadata != old->mp3metadata)
        return 1;

    if (!_EQ_ATTR(server) || new->port != old->port)
        return 1;

    if (!_EQ_ATTR(mount))
        return 1;

/* NOTE: We currently do not consider this a relevant change. Why?
    if (!_EQ_ATTR(username) || !_EQ_ATTR(password))
        return 1;

    if (!_EQ_ATTR(bind))
        return 1;
*/

    return 0;
}

static int relay_has_changed (const relay_config_t *new, relay_config_t *old)
{
    size_t i;

    /* This is not fully true: If more upstreams has been added there is no reason
     * to restart the relay. However for now we ignore this case. TODO: Change this.
     */
    if (new->upstreams != old->upstreams)
        return 1;

    for (i = 0; i < new->upstreams; i++) {
        if (relay_has_changed_upstream(&(new->upstream[i]), &(old->upstream[i])))
            return 1;
    }

    if (relay_has_changed_upstream(&(new->upstream_default), &(old->upstream_default)))
        return 1;

    /* Why do we do this here? */
    old->on_demand = new->on_demand;

    return 0;
634 635 636
}


637 638 639 640
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
641 642
static relay_t *
update_relay_set(relay_t **current, relay_config_t **updated, size_t updated_length)
643
{
644 645 646 647 648 649 650
    relay_config_t *relay;
    relay_t *existing_relay, **existing_p;
    relay_t *new_list = NULL;
    size_t i;

    for (i = 0; i < updated_length; i++) {
        relay = updated[i];
651

652 653 654 655 656 657
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
658 659
            if (strcmp(relay->localmount, existing_relay->config->localmount) == 0)
                if (relay_has_changed(relay, existing_relay->config) == 0)
660
                    break;
661
            existing_p = &existing_relay->next;
662

663 664
            existing_relay = existing_relay->next;
        }
665 666


667 668 669
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
670
            existing_relay = relay_new(relay);
671 672 673 674 675 676 677
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
678
    }
679

680 681 682 683 684 685
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
686
 * are separated and returned in a separate list
687
 */
688 689
static relay_t *
update_relays (relay_t **relay_list, relay_config_t **new_relay_list, size_t new_relay_list_length)
690
{
691
    relay_t *active_relays, *cleanup_relays;
692

693
    active_relays = update_relay_set(relay_list, new_relay_list, new_relay_list_length);
694

695 696 697 698 699 700 701 702
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


703 704
static void relay_check_streams (relay_t *to_start,
        relay_t *to_free, int skip_timer)
705
{
706
    relay_t *relay;
707 708

    while (to_free)
709
    {
710
        if (to_free->source)
711
        {
712 713 714
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
715
                ICECAST_LOG_DEBUG("source shutdown request on \"%s\"", to_free->config->localmount);
716
                to_free->running = 0;
717 718 719 720
                to_free->source->running = 0;
                thread_join (to_free->thread);
            }
            else
721
                stats_event (to_free->config->localmount, NULL, NULL);
722
        }
723 724 725 726 727 728
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
729 730
        if (skip_timer)
            relay->start = 0;
731 732
        check_relay_stream (relay);
        relay = relay->next;
733
    }
734 735
}

736 737 738 739 740

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
741
    sock_t mastersock;
742
    int ret = 0;
743
    char buf[256];
744 745 746
    do
    {
        char *authheader, *data;
747 748 749
        relay_t *cleanup_relays;
        relay_config_t **new_relays = NULL;
        size_t new_relays_length = 0;
750
        int len, count = 1;
751
        int on_demand;
752
        size_t i;
753

754
        username = strdup(config->master_username);
755
        if (config->master_password)
756
            password = strdup(config->master_password);
757

758
        if (config->master_server)
759
            master = strdup(config->master_server);
760

761
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
762

763 764
        if (password == NULL || master == NULL || port == 0)
            break;
765
        on_demand = config->on_demand;
766 767
        ret = 1;
        config_release_config();
768
        mastersock = sock_connect_wto(master, port, 10);
769

770 771
        if (mastersock == SOCK_ERROR)
        {
772
            ICECAST_LOG_WARN("Relay slave failed to contact master server to fetch stream list");
773 774
            break;
        }
Michael Smith's avatar
Michael Smith committed
775

776 777 778
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
779
        data = util_base64_encode(authheader, len);
780 781 782 783 784 785 786
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

787
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
788
                ((strncmp (buf, "HTTP/1.0 200", 12) != 0) && (strncmp (buf, "HTTP/1.1 200", 12) != 0)))
789 790
        {
            sock_close (mastersock);
791
            ICECAST_LOG_WARN("Master rejected streamlist request");
792
            break;
793 794
        } else {
            ICECAST_LOG_INFO("Master accepted streamlist request");
795 796
        }

797 798 799 800 801 802 803
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
804 805 806
            relay_config_t *c = NULL;
            relay_config_t **n;

807 808
            if (!strlen(buf))
                continue;
809
            ICECAST_LOG_DEBUG("read %d from master \"%s\"", count++, buf);
810 811
            xmlURIPtr parsed_uri = xmlParseURI(buf);
            if (parsed_uri == NULL) {
812
                ICECAST_LOG_DEBUG("Error while parsing line from master. Ignoring line.");
813 814
                continue;
            }
815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834

            n = realloc(new_relays, sizeof(*new_relays)*(new_relays_length + 1));
            if (n) {
                new_relays = n;

                c = calloc(1, sizeof(*c));
                new_relays[new_relays_length++] = c;
            }

            if (c) {
                if (parsed_uri->server != NULL) {
                    c->upstream_default.server = strdup(parsed_uri->server);
                    if (parsed_uri->port == 0) {
                        c->upstream_default.port = 80;
                    } else {
                        c->upstream_default.port = parsed_uri->port;
                    }
                } else {
                    c->upstream_default.server = (char *)xmlCharStrdup (master);
                    c->upstream_default.port = port;
835 836
                }

837 838 839 840 841
                c->upstream_default.mount = strdup(parsed_uri->path);
                c->localmount = strdup(parsed_uri->path);
                c->upstream_default.mp3metadata = 1;
                c->on_demand = on_demand;
                ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", c->upstream_default.server, c->upstream_default.port, c->upstream_default.mount);
842
            }
843
            xmlFreeURI(parsed_uri);
Michael Smith's avatar
Michael Smith committed
844
        }
845 846
        sock_close (mastersock);

847
        thread_mutex_lock (&(config_locks()->relay_lock));
848
        cleanup_relays = update_relays (&global.master_relays, new_relays, new_relays_length);
849

850
        relay_check_streams (global.master_relays, cleanup_relays, 0);
851 852 853 854 855

        for (i = 0; i < new_relays_length; i++) {
            relay_config_free(new_relays[i]);
        }
        free(new_relays);
856 857 858

        thread_mutex_unlock (&(config_locks()->relay_lock));

859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
875
    unsigned int interval = 0;
876

877 878
    (void)arg;

879
    thread_mutex_lock(&_slave_mutex);
880
    update_settings = 0;
881
    update_all_mounts = 0;
882
    thread_mutex_unlock(&_slave_mutex);
883

884
    config = config_get_config();
885
    stats_global(config);
886
    config_release_config();
887
    source_recheck_mounts(1);
888

889
    while (1)
890
    {
891
        relay_t *cleanup_relays = NULL;
892
        int skip_timer = 0;
893

894
        /* re-read xml file if requested */
895
        global_lock();
896 897
        if (global.schedule_config_reread) {
            config_reread_config();
898
            global.schedule_config_reread = 0;
899
        }
900
        global_unlock();
901

902
        thread_sleep(1000000);
903 904 905
        thread_mutex_lock(&_slave_mutex);
        if (slave_running == 0) {
            thread_mutex_unlock(&_slave_mutex);
906
            break;
907 908
        }
        thread_mutex_unlock(&_slave_mutex);
909 910

        ++interval;
911

912
        /* only update relays lists when required */
913
        thread_mutex_lock(&_slave_mutex);
914 915
        if (max_interval <= interval)
        {
916
            ICECAST_LOG_DEBUG("checking master stream list");
917
            config = config_get_config();
918

919 920
            if (max_interval == 0)
                skip_timer = 1;
921 922
            interval = 0;
            max_interval = config->master_update_interval;
923
            thread_mutex_unlock(&_slave_mutex);
924

925 926 927
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
928

929
            thread_mutex_lock (&(config_locks()->relay_lock));
930

931
            cleanup_relays = update_relays(&global.relays, config->relay, config->relay_length);
932

933 934 935
            config_release_config();
        }
        else
936 937
        {
            thread_mutex_unlock(&_slave_mutex);
938
            thread_mutex_lock (&(config_locks()->relay_lock));
939
        }
940 941 942 943 944

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

945
        thread_mutex_lock(&_slave_mutex);
946 947
        if (update_settings)
        {
948
            source_recheck_mounts (update_all_mounts);
949
            update_settings = 0;
950
            update_all_mounts = 0;
951
        }
952
        thread_mutex_unlock(&_slave_mutex);
953
    }
954
    ICECAST_LOG_INFO("shutting down current relays");
955 956
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
957

958
    ICECAST_LOG_INFO("Slave thread shutdown complete");
959

960
    return NULL;
961
}
962