slave.c 20.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14
15
16
17
18
19
20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21
22
23
24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

42
#include "compat.h"
43

Karl Heyes's avatar
Karl Heyes committed
44
45
46
47
#include "thread/thread.h"
#include "avl/avl.h"
#include "net/sock.h"
#include "httpp/httpp.h"
48

49
#include "cfgfile.h"
50
51
52
53
54
55
56
57
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
Michael Smith's avatar
Michael Smith committed
58
#include "format.h"
59
#include "event.h"
60
61
62
63

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
64
static thread_type *_slave_thread_id;
65
static int slave_running = 0;
66
static int update_settings = 0;
67
static volatile unsigned int max_interval = 0;
68

69
relay_server *relay_free (relay_server *relay)
70
{
71
72
73
74
75
76
77
    relay_server *next = relay->next;
    DEBUG1("freeing relay %s", relay->localmount);
    if (relay->source)
       source_free_source (relay->source);
    xmlFree (relay->server);
    xmlFree (relay->mount);
    xmlFree (relay->localmount);
78
79
80
81
    if (relay->username)
        xmlFree (relay->username);
    if (relay->password)
        xmlFree (relay->password);
82
    free (relay);
83
    return next;
84
85
}

86

87
88
89
relay_server *relay_copy (relay_server *r)
{
    relay_server *copy = calloc (1, sizeof (relay_server));
Michael Smith's avatar
Michael Smith committed
90

91
    if (copy)
Michael Smith's avatar
Michael Smith committed
92
    {
93
94
95
        copy->server = xmlStrdup (r->server);
        copy->mount = xmlStrdup (r->mount);
        copy->localmount = xmlStrdup (r->localmount);
96
97
98
99
        if (r->username)
            copy->username = xmlStrdup (r->username);
        if (r->password)
            copy->password = xmlStrdup (r->password);
100
101
        copy->port = r->port;
        copy->mp3metadata = r->mp3metadata;
102
        copy->on_demand = r->on_demand;
Michael Smith's avatar
Michael Smith committed
103
    }
104
105
    return copy;
}
106

107

108
109
110
/* force a recheck of the relays. This will recheck the master server if
 * a this is a slave.
 */
111
void slave_recheck_mounts (void)
112
{
113
    max_interval = 0;
114
    update_settings = 1;
115
116
}

117
118
119

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
120
 */
121
void slave_rebuild_mounts (void)
122
{
123
    update_settings = 1;
124
125
}

126
127

void slave_initialize(void)
Michael Smith's avatar
Michael Smith committed
128
{
129
130
    if (slave_running)
        return;
Michael Smith's avatar
Michael Smith committed
131

132
    slave_running = 1;
133
    max_interval = 0;
134
135
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
136

Michael Smith's avatar
Michael Smith committed
137

138
139
140
void slave_shutdown(void)
{
    if (!slave_running)
141
        return;
142
    slave_running = 0;
143
    DEBUG0 ("waiting for slave thread");
144
145
146
147
    thread_join (_slave_thread_id);
}


148
149
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
150
 */
151
static client_t *open_relay_connection (relay_server *relay)
152
{
153
    int redirects = 0;
154
155
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
156
157
158
159
    char *server = strdup (relay->server);
    char *mount = strdup (relay->mount);
    int port = relay->port;
    char *auth_header;
160
161
    char header[4096];

162
163
    /* build any authentication header before connecting */
    if (relay->username && relay->password)
164
    {
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
        char *esc_authorisation;
        unsigned len = strlen(relay->username) + strlen(relay->password) + 2;

        auth_header = malloc (len);
        snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
        esc_authorisation = util_base64_encode(auth_header);
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
180

181
182
183
184
185
186
187
    while (redirects < 10)
    {
        sock_t streamsock;

        INFO2 ("connecting to %s:%d", server, port);

        streamsock = sock_connect_wto (server, port, 10);
188
189
        if (streamsock == SOCK_ERROR)
        {
190
            WARN2 ("Failed to connect to %s:%d", server, port);
191
            break;
Michael Smith's avatar
Michael Smith committed
192
        }
193
        con = connection_create (streamsock, -1, strdup (server));
194

195
196
197
198
199
200
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
201
                "User-Agent: %s\r\n"
202
                "%s"
203
                "%s"
204
                "\r\n",
205
                mount,
206
                ICECAST_VERSION_STRING,
207
208
                relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
                auth_header);
209
        memset (header, 0, sizeof(header));
210
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
211
        {
212
            ERROR4 ("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount);
213
214
215
216
217
218
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
        if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
        {
219
220
            ERROR4("Error parsing relay request for %s (%s:%d%s)", relay->localmount,
                    server, port, mount);
221
222
            break;
        }
223
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
224
        {
225
226
227
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
228

229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
            uri = httpp_getvar (parser, "location");
            INFO1 ("redirect received %s", uri);
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
250
251
252
            con = NULL;
            parser = NULL;
        }
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
                ERROR2("Error from relay request: %s (%s)", relay->localmount,
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
            sock_set_blocking (streamsock, SOCK_NONBLOCK);
            client_set_queue (client, NULL);
            free (server);
            free (mount);
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
    relay_server *relay = arg;
    source_t *src = relay->source;
    client_t *client;
    ice_config_t *config;

    INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
    do
    {
        client = open_relay_connection (relay);

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
317
318

        if (connection_complete_source (src, 0) < 0)
319
        {
320
321
322
323
            INFO0("Failed to complete source initialisation");
            client_destroy (client);
            src->client = NULL;
            continue;
324
        }
325
        stats_event_inc(NULL, "source_relay_connections");
326
327
328
329
330
        stats_event (relay->localmount, "source_ip", client->con->ip);
        config = config_get_config();
        stats_event_args (relay->localmount, "listenurl", "http://%s:%d%s",
                config->hostname, config->port, relay->localmount);
        config_release_config();
331

332
333
        source_main (relay->source);

334
335
336
337
338
        if (relay->on_demand == 0)
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
            yp_remove (relay->localmount);
            relay->source->yp_public = -1;
339
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
340
341
        }

342
        /* we've finished, now get cleaned up */
343
344
345
        relay->cleanup = 1;

        return NULL;
346
    } while (0);  /* TODO allow looping through multiple servers */
347

348
349
350
351
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

352
        DEBUG1 ("failed relay, fallback to %s", relay->source->fallback_mount);
353
354
355
356
357
358
359
360
361
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (relay->source->fallback_mount);

        if (fallback_source != NULL)
            source_move_clients (relay->source, fallback_source);

        avl_tree_unlock (global.source_tree);
    }

362
    source_clear_source (relay->source);
363

364
365
    /* cleanup relay, but prevent this relay from starting up again too soon */
    relay->start = time(NULL) + max_interval;
366
367
368
    relay->cleanup = 1;

    return NULL;
369
370
371
372
373
374
375
376
}


/* wrapper for starting the provided relay stream */
static void check_relay_stream (relay_server *relay)
{
    if (relay->source == NULL)
    {
377
378
379
380
381
382
        if (relay->localmount[0] != '/')
        {
            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
                    relay->localmount);
            return;
        }
383
384
        /* new relay, reserve the name */
        relay->source = source_reserve (relay->localmount);
385
        if (relay->source)
386
        {
387
            DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
388
389
            slave_rebuild_mounts();
        }
390
391
        else
            WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
392
    }
393
    do
394
    {
395
        source_t *source = relay->source;
396
397
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
398
            break;
399
        if (relay->on_demand && source->on_demand_req == 0)
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
        {
            relay->source->on_demand = relay->on_demand;

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
                   DEBUG2 ("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

419
        relay->start = time(NULL) + 5;
420
        relay->running = 1;
421
422
423
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
424

425
    } while (0);
426
    /* the relay thread may of shut down itself */
427
    if (relay->cleanup)
428
    {
429
430
431
432
433
434
        if (relay->thread)
        {
            DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
            thread_join (relay->thread);
            relay->thread = NULL;
        }
435
436
        relay->cleanup = 0;
        relay->running = 0;
437

438
        if (relay->on_demand && relay->source)
439
440
441
442
443
444
445
        {
            ice_config_t *config = config_get_config ();
            mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
            stats_event (relay->localmount, "listeners", "0");
        }
Michael Smith's avatar
Michael Smith committed
446
    }
447
448
}

Michael Smith's avatar
Michael Smith committed
449

450
451
452
453
454
455
456
457
458
459
460
461
462
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
static int relay_has_changed (relay_server *new, relay_server *old)
{
    do
    {
        if (strcmp (new->mount, old->mount) != 0)
            break;
        if (strcmp (new->server, old->server) != 0)
            break;
        if (new->port != old->port)
            break;
463
464
        if (new->mp3metadata != old->mp3metadata)
            break;
465
466
        if (new->on_demand != old->on_demand)
            old->on_demand = new->on_demand;
467
468
469
470
471
472
        return 0;
    } while (0);
    return 1;
}


473
474
475
476
477
478
479
480
481
482
483
484
485
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
static relay_server *
update_relay_set (relay_server **current, relay_server *updated)
{
    relay_server *relay = updated;
    relay_server *existing_relay, **existing_p;
    relay_server *new_list = NULL;

    while (relay)
    {
486
487
488
489
490
491
492
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
            if (strcmp (relay->localmount, existing_relay->localmount) == 0)
493
494
                if (relay_has_changed (relay, existing_relay) == 0)
                    break;
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
            existing_p = &existing_relay->next;
            existing_relay = existing_relay->next;
        }
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
            existing_relay = relay_copy (relay);
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
        relay = relay->next;
510
    }
511
512
513
514
515
516
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
517
 * are separated and returned in a separate list
518
 */
519
static relay_server *
520
521
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
522
    relay_server *active_relays, *cleanup_relays;
523

524
    active_relays = update_relay_set (relay_list, new_relay_list);
525

526
527
528
529
530
531
532
533
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


534
535
static void relay_check_streams (relay_server *to_start,
        relay_server *to_free, int skip_timer)
536
537
538
539
{
    relay_server *relay;

    while (to_free)
540
    {
541
        if (to_free->source)
542
        {
543
544
545
546
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
                DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
547
                to_free->running = 0;
548
549
550
551
552
553
                to_free->source->running = 0;
                thread_join (to_free->thread);
                slave_rebuild_mounts();
            }
            else
                stats_event (to_free->localmount, NULL, NULL);
554
        }
555
556
557
558
559
560
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
561
562
        if (skip_timer)
            relay->start = 0;
563
564
        check_relay_stream (relay);
        relay = relay->next;
565
    }
Michael Smith's avatar
Michael Smith committed
566
567
}

568
569
570
571
572

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
573
    sock_t mastersock;
574
    int ret = 0;
575
    char buf[256];
576
577
578
    do
    {
        char *authheader, *data;
579
        relay_server *new_relays = NULL, *cleanup_relays;
580
        int len, count = 1;
581
        int on_demand;
582

583
        username = strdup (config->master_username);
584
585
        if (config->master_password)
            password = strdup (config->master_password);
Michael Smith's avatar
Michael Smith committed
586

587
588
        if (config->master_server)
            master = strdup (config->master_server);
Michael Smith's avatar
Michael Smith committed
589

590
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
591

592
593
        if (password == NULL || master == NULL || port == 0)
            break;
594
        on_demand = config->on_demand;
595
596
597
        ret = 1;
        config_release_config();
        mastersock = sock_connect_wto (master, port, 0);
598

599
600
601
602
603
        if (mastersock == SOCK_ERROR)
        {
            WARN0("Relay slave failed to contact master server to fetch stream list");
            break;
        }
Michael Smith's avatar
Michael Smith committed
604

605
606
607
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
608
609
610
611
612
613
614
615
        data = util_base64_encode(authheader);
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

616
617
618
619
620
621
622
623
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
                strncmp (buf, "HTTP/1.0 200", 12) != 0)
        {
            sock_close (mastersock);
            WARN0 ("Master rejected streamlist request");
            break;
        }

624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            relay_server *r;
            if (!strlen(buf))
                continue;
            DEBUG2 ("read %d from master \"%s\"", count++, buf);
            r = calloc (1, sizeof (relay_server));
            if (r)
            {
                r->server = xmlStrdup (master);
                r->port = port;
                r->mount = xmlStrdup (buf);
                r->localmount = xmlStrdup (buf);
                r->mp3metadata = 1;
643
                r->on_demand = on_demand;
644
645
                r->next = new_relays;
                new_relays = r;
646
            }
Michael Smith's avatar
Michael Smith committed
647
        }
648
649
        sock_close (mastersock);

650
651
652
        thread_mutex_lock (&(config_locks()->relay_lock));
        cleanup_relays = update_relays (&global.master_relays, new_relays);
        
653
654
        relay_check_streams (global.master_relays, cleanup_relays, 0);
        relay_check_streams (NULL, new_relays, 0);
655
656
657

        thread_mutex_unlock (&(config_locks()->relay_lock));

658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
674
    unsigned int interval = 0;
Michael Smith's avatar
Michael Smith committed
675

676
677
    source_recheck_mounts();

678
    while (1)
679
    {
680
681
        relay_server *cleanup_relays = NULL;
        int skip_timer = 0;
682

683
684
685
686
687
688
689
        /* re-read xml file if requested */
        if (global . schedule_config_reread)
        {
            event_config_read (NULL);
            global . schedule_config_reread = 0;
        }

690
        thread_sleep (1000000);
691
692
        if (slave_running == 0)
            break;
693
694

        ++interval;
695

696
697
698
699
700
        /* only update relays lists when required */
        if (max_interval <= interval)
        {
            DEBUG0 ("checking master stream list");
            config = config_get_config();
Michael Smith's avatar
Michael Smith committed
701

702
703
            if (max_interval == 0)
                skip_timer = 1;
704
705
            interval = 0;
            max_interval = config->master_update_interval;
Michael Smith's avatar
Michael Smith committed
706

707
708
709
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
710

711
            thread_mutex_lock (&(config_locks()->relay_lock));
712

713
            cleanup_relays = update_relays (&global.relays, config->relay);
714

715
716
717
718
            config_release_config();
        }
        else
            thread_mutex_lock (&(config_locks()->relay_lock));
719
720
721
722
723

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

724
725
726
727
728
        if (update_settings)
        {
            update_settings = 0;
            source_recheck_mounts();
        }
729
    }
730
    INFO0 ("shutting down current relays");
731
732
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
733

734
735
    INFO0 ("Slave thread shutdown complete");

736
    return NULL;
737
}
Michael Smith's avatar
Michael Smith committed
738