slave.c 21.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14
15
16
17
18
19
20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21
22
23
24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

40
#include "compat.h"
41

Karl Heyes's avatar
Karl Heyes committed
42
43
44
45
#include "thread/thread.h"
#include "avl/avl.h"
#include "net/sock.h"
#include "httpp/httpp.h"
46

47
#include "cfgfile.h"
48
49
50
51
52
53
54
55
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
Michael Smith's avatar
Michael Smith committed
56
#include "format.h"
57
#include "event.h"
58
59
60
61

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
62
static thread_type *_slave_thread_id;
63
static int slave_running = 0;
64
static volatile int update_settings = 0;
65
static volatile int update_all_mounts = 0;
66
static volatile unsigned int max_interval = 0;
67

68
relay_server *relay_free (relay_server *relay)
69
{
70
71
72
73
74
75
76
    relay_server *next = relay->next;
    DEBUG1("freeing relay %s", relay->localmount);
    if (relay->source)
       source_free_source (relay->source);
    xmlFree (relay->server);
    xmlFree (relay->mount);
    xmlFree (relay->localmount);
77
78
79
80
    if (relay->username)
        xmlFree (relay->username);
    if (relay->password)
        xmlFree (relay->password);
81
    free (relay);
82
    return next;
83
84
}

85

86
87
88
relay_server *relay_copy (relay_server *r)
{
    relay_server *copy = calloc (1, sizeof (relay_server));
Michael Smith's avatar
Michael Smith committed
89

90
    if (copy)
Michael Smith's avatar
Michael Smith committed
91
    {
92
93
94
        copy->server = (char *)xmlCharStrdup (r->server);
        copy->mount = (char *)xmlCharStrdup (r->mount);
        copy->localmount = (char *)xmlCharStrdup (r->localmount);
95
        if (r->username)
96
            copy->username = (char *)xmlCharStrdup (r->username);
97
        if (r->password)
98
            copy->password = (char *)xmlCharStrdup (r->password);
99
100
        copy->port = r->port;
        copy->mp3metadata = r->mp3metadata;
101
        copy->on_demand = r->on_demand;
Michael Smith's avatar
Michael Smith committed
102
    }
103
104
    return copy;
}
105

106

107
/* force a recheck of the relays. This will recheck the master server if
108
 * this is a slave and rebuild all mountpoints in the stats tree
109
 */
110
void slave_update_all_mounts (void)
111
{
112
    max_interval = 0;
113
    update_all_mounts = 1;
114
    update_settings = 1;
115
116
}

117
118
119

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
120
 */
121
void slave_rebuild_mounts (void)
122
{
123
    update_settings = 1;
124
125
}

126
127

void slave_initialize(void)
Michael Smith's avatar
Michael Smith committed
128
{
129
130
    if (slave_running)
        return;
Michael Smith's avatar
Michael Smith committed
131

132
    slave_running = 1;
133
    max_interval = 0;
134
135
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
136

Michael Smith's avatar
Michael Smith committed
137

138
139
140
void slave_shutdown(void)
{
    if (!slave_running)
141
        return;
142
    slave_running = 0;
143
    DEBUG0 ("waiting for slave thread");
144
145
146
147
    thread_join (_slave_thread_id);
}


148
149
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
150
 */
151
static client_t *open_relay_connection (relay_server *relay)
152
{
153
    int redirects = 0;
154
155
    char *server_id = NULL;
    ice_config_t *config;
156
157
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
158
159
160
161
    char *server = strdup (relay->server);
    char *mount = strdup (relay->mount);
    int port = relay->port;
    char *auth_header;
162
163
    char header[4096];

164
165
166
167
    config = config_get_config ();
    server_id = strdup (config->server_id);
    config_release_config ();

168
169
    /* build any authentication header before connecting */
    if (relay->username && relay->password)
170
    {
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
        char *esc_authorisation;
        unsigned len = strlen(relay->username) + strlen(relay->password) + 2;

        auth_header = malloc (len);
        snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
        esc_authorisation = util_base64_encode(auth_header);
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
186

187
188
189
190
191
192
193
    while (redirects < 10)
    {
        sock_t streamsock;

        INFO2 ("connecting to %s:%d", server, port);

        streamsock = sock_connect_wto (server, port, 10);
194
195
        if (streamsock == SOCK_ERROR)
        {
196
            WARN2 ("Failed to connect to %s:%d", server, port);
197
            break;
Michael Smith's avatar
Michael Smith committed
198
        }
199
        con = connection_create (streamsock, -1, strdup (server));
200

201
202
203
204
205
206
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
207
                "User-Agent: %s\r\n"
208
                "%s"
209
                "%s"
210
                "\r\n",
211
                mount,
212
                server_id,
213
214
                relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
                auth_header);
215
        memset (header, 0, sizeof(header));
216
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
217
        {
218
            ERROR4 ("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount);
219
220
221
222
223
224
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
        if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
        {
225
226
            ERROR4("Error parsing relay request for %s (%s:%d%s)", relay->localmount,
                    server, port, mount);
227
228
            break;
        }
229
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
230
        {
231
232
233
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
234

235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
            uri = httpp_getvar (parser, "location");
            INFO1 ("redirect received %s", uri);
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
256
257
258
            con = NULL;
            parser = NULL;
        }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
                ERROR2("Error from relay request: %s (%s)", relay->localmount,
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
            sock_set_blocking (streamsock, SOCK_NONBLOCK);
            client_set_queue (client, NULL);
            free (server);
            free (mount);
284
            free (server_id);
285
286
287
288
289
290
291
292
293
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
294
    free (server_id);
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
    relay_server *relay = arg;
    source_t *src = relay->source;
    client_t *client;

    INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
    do
    {
        client = open_relay_connection (relay);

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
324
325

        if (connection_complete_source (src, 0) < 0)
326
        {
327
328
329
330
            INFO0("Failed to complete source initialisation");
            client_destroy (client);
            src->client = NULL;
            continue;
331
        }
332
        stats_event_inc(NULL, "source_relay_connections");
333
        stats_event (relay->localmount, "source_ip", client->con->ip);
334

335
336
        source_main (relay->source);

337
338
339
340
341
        if (relay->on_demand == 0)
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
            yp_remove (relay->localmount);
            relay->source->yp_public = -1;
342
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
343
            slave_update_all_mounts();
344
345
        }

346
        /* we've finished, now get cleaned up */
347
        relay->cleanup = 1;
348
        slave_rebuild_mounts();
349
350

        return NULL;
351
    } while (0);  /* TODO allow looping through multiple servers */
352

353
354
355
356
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

357
        DEBUG1 ("failed relay, fallback to %s", relay->source->fallback_mount);
358
359
360
361
362
363
364
365
366
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (relay->source->fallback_mount);

        if (fallback_source != NULL)
            source_move_clients (relay->source, fallback_source);

        avl_tree_unlock (global.source_tree);
    }

367
    source_clear_source (relay->source);
368

369
    /* cleanup relay, but prevent this relay from starting up again too soon */
Karl Heyes's avatar
Karl Heyes committed
370
    relay->source->on_demand = 0;
371
    relay->start = time(NULL) + max_interval;
372
373
374
    relay->cleanup = 1;

    return NULL;
375
376
377
378
379
380
381
382
}


/* wrapper for starting the provided relay stream */
static void check_relay_stream (relay_server *relay)
{
    if (relay->source == NULL)
    {
383
384
385
386
387
388
        if (relay->localmount[0] != '/')
        {
            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
                    relay->localmount);
            return;
        }
389
390
        /* new relay, reserve the name */
        relay->source = source_reserve (relay->localmount);
391
        if (relay->source)
392
        {
393
            DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
394
            if (relay->on_demand)
Karl Heyes's avatar
Karl Heyes committed
395
396
397
398
399
400
401
            {
                ice_config_t *config = config_get_config ();
                mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
                if (mountinfo == NULL)
                    source_update_settings (config, relay->source, mountinfo);
                config_release_config ();
                stats_event (relay->localmount, "listeners", "0");
402
                slave_update_all_mounts();
Karl Heyes's avatar
Karl Heyes committed
403
            }
404
        }
405
406
        else
            WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
407
    }
408
    do
409
    {
410
        source_t *source = relay->source;
411
412
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
413
            break;
414
        /* check if an inactive on-demand relay has a fallback that has listeners */
415
        if (relay->on_demand && source->on_demand_req == 0)
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
        {
            relay->source->on_demand = relay->on_demand;

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
                   DEBUG2 ("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

435
        relay->start = time(NULL) + 5;
436
        relay->running = 1;
437
438
439
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
440

441
    } while (0);
442
    /* the relay thread may of shut down itself */
443
    if (relay->cleanup)
444
    {
445
446
447
448
449
450
        if (relay->thread)
        {
            DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
            thread_join (relay->thread);
            relay->thread = NULL;
        }
451
452
        relay->cleanup = 0;
        relay->running = 0;
453

454
        if (relay->on_demand && relay->source)
455
456
457
458
459
460
461
        {
            ice_config_t *config = config_get_config ();
            mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
            stats_event (relay->localmount, "listeners", "0");
        }
Michael Smith's avatar
Michael Smith committed
462
    }
463
464
}

Michael Smith's avatar
Michael Smith committed
465

466
467
468
469
470
471
472
473
474
475
476
477
478
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
static int relay_has_changed (relay_server *new, relay_server *old)
{
    do
    {
        if (strcmp (new->mount, old->mount) != 0)
            break;
        if (strcmp (new->server, old->server) != 0)
            break;
        if (new->port != old->port)
            break;
479
480
        if (new->mp3metadata != old->mp3metadata)
            break;
481
482
        if (new->on_demand != old->on_demand)
            old->on_demand = new->on_demand;
483
484
485
486
487
488
        return 0;
    } while (0);
    return 1;
}


489
490
491
492
493
494
495
496
497
498
499
500
501
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
static relay_server *
update_relay_set (relay_server **current, relay_server *updated)
{
    relay_server *relay = updated;
    relay_server *existing_relay, **existing_p;
    relay_server *new_list = NULL;

    while (relay)
    {
502
503
504
505
506
507
508
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
            if (strcmp (relay->localmount, existing_relay->localmount) == 0)
509
510
                if (relay_has_changed (relay, existing_relay) == 0)
                    break;
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
            existing_p = &existing_relay->next;
            existing_relay = existing_relay->next;
        }
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
            existing_relay = relay_copy (relay);
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
        relay = relay->next;
526
    }
527
528
529
530
531
532
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
533
 * are separated and returned in a separate list
534
 */
535
static relay_server *
536
537
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
538
    relay_server *active_relays, *cleanup_relays;
539

540
    active_relays = update_relay_set (relay_list, new_relay_list);
541

542
543
544
545
546
547
548
549
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


550
551
static void relay_check_streams (relay_server *to_start,
        relay_server *to_free, int skip_timer)
552
553
554
555
{
    relay_server *relay;

    while (to_free)
556
    {
557
        if (to_free->source)
558
        {
559
560
561
562
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
                DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
563
                to_free->running = 0;
564
565
566
567
568
                to_free->source->running = 0;
                thread_join (to_free->thread);
            }
            else
                stats_event (to_free->localmount, NULL, NULL);
569
        }
570
571
572
573
574
575
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
576
577
        if (skip_timer)
            relay->start = 0;
578
579
        check_relay_stream (relay);
        relay = relay->next;
580
    }
Michael Smith's avatar
Michael Smith committed
581
582
}

583
584
585
586
587

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
588
    sock_t mastersock;
589
    int ret = 0;
590
    char buf[256];
591
592
593
    do
    {
        char *authheader, *data;
594
        relay_server *new_relays = NULL, *cleanup_relays;
595
        int len, count = 1;
596
        int on_demand;
597

598
        username = strdup (config->master_username);
599
600
        if (config->master_password)
            password = strdup (config->master_password);
Michael Smith's avatar
Michael Smith committed
601

602
603
        if (config->master_server)
            master = strdup (config->master_server);
Michael Smith's avatar
Michael Smith committed
604

605
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
606

607
608
        if (password == NULL || master == NULL || port == 0)
            break;
609
        on_demand = config->on_demand;
610
611
612
        ret = 1;
        config_release_config();
        mastersock = sock_connect_wto (master, port, 0);
613

614
615
616
617
618
        if (mastersock == SOCK_ERROR)
        {
            WARN0("Relay slave failed to contact master server to fetch stream list");
            break;
        }
Michael Smith's avatar
Michael Smith committed
619

620
621
622
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
623
624
625
626
627
628
629
630
        data = util_base64_encode(authheader);
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

631
632
633
634
635
636
637
638
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
                strncmp (buf, "HTTP/1.0 200", 12) != 0)
        {
            sock_close (mastersock);
            WARN0 ("Master rejected streamlist request");
            break;
        }

639
640
641
642
643
644
645
646
647
648
649
650
651
652
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            relay_server *r;
            if (!strlen(buf))
                continue;
            DEBUG2 ("read %d from master \"%s\"", count++, buf);
            r = calloc (1, sizeof (relay_server));
            if (r)
            {
653
                r->server = (char *)xmlCharStrdup (master);
654
                r->port = port;
655
656
                r->mount = (char *)xmlCharStrdup (buf);
                r->localmount = (char *)xmlCharStrdup (buf);
657
                r->mp3metadata = 1;
658
                r->on_demand = on_demand;
659
660
                r->next = new_relays;
                new_relays = r;
661
            }
Michael Smith's avatar
Michael Smith committed
662
        }
663
664
        sock_close (mastersock);

665
666
667
        thread_mutex_lock (&(config_locks()->relay_lock));
        cleanup_relays = update_relays (&global.master_relays, new_relays);
        
668
669
        relay_check_streams (global.master_relays, cleanup_relays, 0);
        relay_check_streams (NULL, new_relays, 0);
670
671
672

        thread_mutex_unlock (&(config_locks()->relay_lock));

673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
689
    unsigned int interval = 0;
Michael Smith's avatar
Michael Smith committed
690

691
    update_settings = 0;
692
    update_all_mounts = 0;
693

Karl Heyes's avatar
Karl Heyes committed
694
695
696
    config = config_get_config();
    stats_global (config);
    config_release_config();
697
    source_recheck_mounts (1);
698

699
    while (1)
700
    {
701
702
        relay_server *cleanup_relays = NULL;
        int skip_timer = 0;
703

704
705
706
707
708
709
710
        /* re-read xml file if requested */
        if (global . schedule_config_reread)
        {
            event_config_read (NULL);
            global . schedule_config_reread = 0;
        }

711
        thread_sleep (1000000);
712
713
        if (slave_running == 0)
            break;
714
715

        ++interval;
716

717
718
719
720
721
        /* only update relays lists when required */
        if (max_interval <= interval)
        {
            DEBUG0 ("checking master stream list");
            config = config_get_config();
Michael Smith's avatar
Michael Smith committed
722

723
724
            if (max_interval == 0)
                skip_timer = 1;
725
726
            interval = 0;
            max_interval = config->master_update_interval;
Michael Smith's avatar
Michael Smith committed
727

728
729
730
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
731

732
            thread_mutex_lock (&(config_locks()->relay_lock));
733

734
            cleanup_relays = update_relays (&global.relays, config->relay);
735

736
737
738
739
            config_release_config();
        }
        else
            thread_mutex_lock (&(config_locks()->relay_lock));
740
741
742
743
744

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

745
746
        if (update_settings)
        {
747
            source_recheck_mounts (update_all_mounts);
748
            update_settings = 0;
749
            update_all_mounts = 0;
750
        }
751
    }
752
    INFO0 ("shutting down current relays");
753
754
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
755

756
757
    INFO0 ("Slave thread shutdown complete");

758
    return NULL;
759
}
Michael Smith's avatar
Michael Smith committed
760