slave.c 18.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17 18 19 20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21 22 23 24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

42
#include "compat.h"
43

Karl Heyes's avatar
Karl Heyes committed
44 45 46 47
#include "thread/thread.h"
#include "avl/avl.h"
#include "net/sock.h"
#include "httpp/httpp.h"
48

49
#include "cfgfile.h"
50 51 52 53 54 55 56 57
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
Michael Smith's avatar
Michael Smith committed
58
#include "format.h"
59
#include "event.h"
60 61 62 63

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
64
static thread_type *_slave_thread_id;
65
static int slave_running = 0;
66
static int update_settings = 0;
67
static volatile unsigned int max_interval = 0;
68

69
relay_server *relay_free (relay_server *relay)
70
{
71 72 73 74 75 76 77
    relay_server *next = relay->next;
    DEBUG1("freeing relay %s", relay->localmount);
    if (relay->source)
       source_free_source (relay->source);
    xmlFree (relay->server);
    xmlFree (relay->mount);
    xmlFree (relay->localmount);
78 79 80 81
    if (relay->username)
        xmlFree (relay->username);
    if (relay->password)
        xmlFree (relay->password);
82
    free (relay);
83
    return next;
84 85
}

86

87 88 89
relay_server *relay_copy (relay_server *r)
{
    relay_server *copy = calloc (1, sizeof (relay_server));
Michael Smith's avatar
Michael Smith committed
90

91
    if (copy)
Michael Smith's avatar
Michael Smith committed
92
    {
93 94 95
        copy->server = xmlStrdup (r->server);
        copy->mount = xmlStrdup (r->mount);
        copy->localmount = xmlStrdup (r->localmount);
96 97 98 99
        if (r->username)
            copy->username = xmlStrdup (r->username);
        if (r->password)
            copy->password = xmlStrdup (r->password);
100 101
        copy->port = r->port;
        copy->mp3metadata = r->mp3metadata;
102
        copy->on_demand = r->on_demand;
Michael Smith's avatar
Michael Smith committed
103
    }
104 105
    return copy;
}
106

107

108 109 110
/* force a recheck of the relays. This will recheck the master server if
 * a this is a slave.
 */
111
void slave_recheck_mounts (void)
112
{
113
    max_interval = 0;
114
    update_settings = 1;
115 116
}

117 118 119

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
120
 */
121
void slave_rebuild_mounts (void)
122
{
123
    update_settings = 1;
124 125
}

126 127

void slave_initialize(void)
Michael Smith's avatar
Michael Smith committed
128
{
129 130
    if (slave_running)
        return;
Michael Smith's avatar
Michael Smith committed
131

132
    slave_running = 1;
133
    max_interval = 0;
134 135
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
136

Michael Smith's avatar
Michael Smith committed
137

138 139 140
void slave_shutdown(void)
{
    if (!slave_running)
141
        return;
142
    slave_running = 0;
143
    DEBUG0 ("waiting for slave thread");
144 145 146 147 148 149 150
    thread_join (_slave_thread_id);
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
151
static void *start_relay_stream (void *arg)
152
{
153
    relay_server *relay = arg;
154 155 156 157 158 159
    sock_t streamsock = SOCK_ERROR;
    source_t *src = relay->source;
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
    char header[4096];

160
    relay->running = 1;
161 162 163
    INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
    do
    {
164 165
        char *auth_header;

166 167 168 169 170 171
        streamsock = sock_connect_wto (relay->server, relay->port, 30);
        if (streamsock == SOCK_ERROR)
        {
            WARN3("Failed to relay stream from master server, couldn't connect to http://%s:%d%s",
                    relay->server, relay->port, relay->mount);
            break;
Michael Smith's avatar
Michael Smith committed
172
        }
173
        con = connection_create (streamsock, -1, NULL);
174

175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
        if (relay->username && relay->password)
        {
            char *esc_authorisation;
            unsigned len = strlen(relay->username) + strlen(relay->password) + 2;

            auth_header = malloc (len);
            snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
            esc_authorisation = util_base64_encode(auth_header);
            free(auth_header);
            len = strlen (esc_authorisation) + 24;
            auth_header = malloc (len);
            snprintf (auth_header, len,
                    "Authorization: Basic %s\r\n", esc_authorisation);
            free(esc_authorisation);
        }
        else
        {
            auth_header = strdup ("");
        }

195 196 197 198 199 200
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
201
                "User-Agent: %s\r\n"
202
                "%s"
203
                "%s"
204
                "\r\n",
205
                relay->mount,
206
                ICECAST_VERSION_STRING,
207 208 209
                relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
                auth_header);
        free (auth_header);
210
        memset (header, 0, sizeof(header));
211
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
212 213 214 215 216 217 218 219
        {
            WARN0("Header read failed");
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
        if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
        {
Michael Smith's avatar
Michael Smith committed
220
            ERROR0("Error parsing relay request");
221 222 223 224 225 226 227 228 229
            break;
        }
        if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
        {
            ERROR1("Error from relay request: %s", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
            break;
        }
        src->parser = parser;
        src->con = con;
230

231
        global_lock ();
232 233
        if (client_create (&src->client, con, parser) < 0)
        {
234
            global_unlock ();
235 236 237 238 239
            /* make sure only the client_destory frees these */
            con = NULL;
            parser = NULL;
            break;
        }
240
        global_unlock ();
241
        sock_set_blocking (streamsock, SOCK_NONBLOCK);
242 243
        con = NULL;
        parser = NULL;
244 245 246
        client_set_queue (src->client, NULL);

        if (connection_complete_source (src, 0) < 0)
247 248 249 250
        {
            DEBUG0("Failed to complete source initialisation");
            break;
        }
251
        stats_event_inc(NULL, "source_relay_connections");
252
        stats_event (relay->localmount, "source_ip", relay->server);
253

254 255
        source_main (relay->source);

256 257 258 259 260
        if (relay->on_demand == 0)
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
            yp_remove (relay->localmount);
            relay->source->yp_public = -1;
261
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
262 263
        }

264
        /* we've finished, now get cleaned up */
265 266 267
        relay->cleanup = 1;

        return NULL;
268 269
    } while (0);

270 271 272 273
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

274
        DEBUG1 ("failed relay, fallback to %s", relay->source->fallback_mount);
275 276 277 278 279 280 281 282 283
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (relay->source->fallback_mount);

        if (fallback_source != NULL)
            source_move_clients (relay->source, fallback_source);

        avl_tree_unlock (global.source_tree);
    }

284 285 286 287 288 289
    if (con)
        connection_close (con);
    src->con = NULL;
    if (parser)
        httpp_destroy (parser);
    src->parser = NULL;
290
    source_clear_source (relay->source);
291

292 293
    /* cleanup relay, but prevent this relay from starting up again too soon */
    relay->start = time(NULL) + max_interval;
294 295 296
    relay->cleanup = 1;

    return NULL;
297 298 299 300 301 302 303 304
}


/* wrapper for starting the provided relay stream */
static void check_relay_stream (relay_server *relay)
{
    if (relay->source == NULL)
    {
305 306 307 308 309 310
        if (relay->localmount[0] != '/')
        {
            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
                    relay->localmount);
            return;
        }
311 312
        /* new relay, reserve the name */
        relay->source = source_reserve (relay->localmount);
313
        if (relay->source)
314
        {
315
            DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
316 317
            slave_rebuild_mounts();
        }
318 319
        else
            WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
320
    }
321
    do
322
    {
323
        source_t *source = relay->source;
324 325
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
326
            break;
327
        if (relay->on_demand && source->on_demand_req == 0)
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
        {
            relay->source->on_demand = relay->on_demand;

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                DEBUG1 ("checking %s for fallback override", source->fallback_mount);
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
                   DEBUG2 ("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

348
        relay->start = time(NULL) + 5;
349 350 351
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
352

353
    } while (0);
354
    /* the relay thread may of shut down itself */
355
    if (relay->cleanup)
356
    {
357 358 359 360 361 362
        if (relay->thread)
        {
            DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
            thread_join (relay->thread);
            relay->thread = NULL;
        }
363 364
        relay->cleanup = 0;
        relay->running = 0;
365 366 367 368 369 370 371 372 373

        if (relay->on_demand)
        {
            ice_config_t *config = config_get_config ();
            mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
            stats_event (relay->localmount, "listeners", "0");
        }
Michael Smith's avatar
Michael Smith committed
374
    }
375 376
}

Michael Smith's avatar
Michael Smith committed
377

378 379 380 381 382 383 384 385 386 387 388 389 390
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
static int relay_has_changed (relay_server *new, relay_server *old)
{
    do
    {
        if (strcmp (new->mount, old->mount) != 0)
            break;
        if (strcmp (new->server, old->server) != 0)
            break;
        if (new->port != old->port)
            break;
391 392
        if (new->mp3metadata != old->mp3metadata)
            break;
393 394
        if (new->on_demand != old->on_demand)
            old->on_demand = new->on_demand;
395 396 397 398 399 400
        return 0;
    } while (0);
    return 1;
}


401 402 403 404 405 406 407 408 409 410 411 412 413
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
static relay_server *
update_relay_set (relay_server **current, relay_server *updated)
{
    relay_server *relay = updated;
    relay_server *existing_relay, **existing_p;
    relay_server *new_list = NULL;

    while (relay)
    {
414 415 416 417 418 419 420
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
            if (strcmp (relay->localmount, existing_relay->localmount) == 0)
421 422
                if (relay_has_changed (relay, existing_relay) == 0)
                    break;
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
            existing_p = &existing_relay->next;
            existing_relay = existing_relay->next;
        }
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
            existing_relay = relay_copy (relay);
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
        relay = relay->next;
438
    }
439 440 441 442 443 444
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
445
 * are separated and returned in a separate list
446
 */
447
static relay_server *
448 449
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
450
    relay_server *active_relays, *cleanup_relays;
451

452
    active_relays = update_relay_set (relay_list, new_relay_list);
453

454 455 456 457 458 459 460 461
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


462 463
static void relay_check_streams (relay_server *to_start,
        relay_server *to_free, int skip_timer)
464 465 466 467
{
    relay_server *relay;

    while (to_free)
468
    {
469
        if (to_free->source)
470
        {
471 472 473 474 475 476 477 478 479 480
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
                DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
                to_free->source->running = 0;
                thread_join (to_free->thread);
                slave_rebuild_mounts();
            }
            else
                stats_event (to_free->localmount, NULL, NULL);
481
        }
482 483 484 485 486 487
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
488 489
        if (skip_timer)
            relay->start = 0;
490 491
        check_relay_stream (relay);
        relay = relay->next;
492
    }
Michael Smith's avatar
Michael Smith committed
493 494
}

495 496 497 498 499

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
500
    sock_t mastersock;
501
    int ret = 0;
502
    char buf[256];
503 504 505
    do
    {
        char *authheader, *data;
506
        relay_server *new_relays = NULL, *cleanup_relays;
507
        int len, count = 1;
508
        int on_demand;
509

510
        username = strdup (config->master_username);
511 512
        if (config->master_password)
            password = strdup (config->master_password);
Michael Smith's avatar
Michael Smith committed
513

514 515
        if (config->master_server)
            master = strdup (config->master_server);
Michael Smith's avatar
Michael Smith committed
516

517
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
518

519 520
        if (password == NULL || master == NULL || port == 0)
            break;
521
        on_demand = config->on_demand;
522 523 524
        ret = 1;
        config_release_config();
        mastersock = sock_connect_wto (master, port, 0);
525

526 527 528 529 530
        if (mastersock == SOCK_ERROR)
        {
            WARN0("Relay slave failed to contact master server to fetch stream list");
            break;
        }
Michael Smith's avatar
Michael Smith committed
531

532 533 534
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
535 536 537 538 539 540 541 542
        data = util_base64_encode(authheader);
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

543 544 545 546 547 548 549 550
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
                strncmp (buf, "HTTP/1.0 200", 12) != 0)
        {
            sock_close (mastersock);
            WARN0 ("Master rejected streamlist request");
            break;
        }

551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            relay_server *r;
            if (!strlen(buf))
                continue;
            DEBUG2 ("read %d from master \"%s\"", count++, buf);
            r = calloc (1, sizeof (relay_server));
            if (r)
            {
                r->server = xmlStrdup (master);
                r->port = port;
                r->mount = xmlStrdup (buf);
                r->localmount = xmlStrdup (buf);
                r->mp3metadata = 1;
570
                r->on_demand = on_demand;
571 572
                r->next = new_relays;
                new_relays = r;
573
            }
Michael Smith's avatar
Michael Smith committed
574
        }
575 576
        sock_close (mastersock);

577 578 579
        thread_mutex_lock (&(config_locks()->relay_lock));
        cleanup_relays = update_relays (&global.master_relays, new_relays);
        
580 581
        relay_check_streams (global.master_relays, cleanup_relays, 0);
        relay_check_streams (NULL, new_relays, 0);
582 583 584

        thread_mutex_unlock (&(config_locks()->relay_lock));

585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
601
    unsigned int interval = 0;
Michael Smith's avatar
Michael Smith committed
602

603 604
    source_recheck_mounts();

605
    while (1)
606
    {
607 608
        relay_server *cleanup_relays = NULL;
        int skip_timer = 0;
609

610 611 612 613 614 615 616
        /* re-read xml file if requested */
        if (global . schedule_config_reread)
        {
            event_config_read (NULL);
            global . schedule_config_reread = 0;
        }

617
        thread_sleep (1000000);
618 619
        if (slave_running == 0)
            break;
620 621

        ++interval;
622

623 624 625 626 627
        /* only update relays lists when required */
        if (max_interval <= interval)
        {
            DEBUG0 ("checking master stream list");
            config = config_get_config();
Michael Smith's avatar
Michael Smith committed
628

629 630
            if (max_interval == 0)
                skip_timer = 1;
631 632
            interval = 0;
            max_interval = config->master_update_interval;
Michael Smith's avatar
Michael Smith committed
633

634 635 636
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
637

638
            thread_mutex_lock (&(config_locks()->relay_lock));
639

640
            cleanup_relays = update_relays (&global.relays, config->relay);
641

642 643 644 645
            config_release_config();
        }
        else
            thread_mutex_lock (&(config_locks()->relay_lock));
646 647 648 649 650

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

651 652 653 654 655
        if (update_settings)
        {
            update_settings = 0;
            source_recheck_mounts();
        }
656
    }
657
    DEBUG0 ("shutting down current relays");
658 659
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
660

661 662
    INFO0 ("Slave thread shutdown complete");

663
    return NULL;
664
}
Michael Smith's avatar
Michael Smith committed
665