slave.c 21.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14
15
16
17
18
19
20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21
22
23
24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

40
#include "compat.h"
41

Karl Heyes's avatar
Karl Heyes committed
42
43
44
45
#include "thread/thread.h"
#include "avl/avl.h"
#include "net/sock.h"
#include "httpp/httpp.h"
46

47
#include "cfgfile.h"
48
49
50
51
52
53
54
55
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
Michael Smith's avatar
Michael Smith committed
56
#include "format.h"
57
#include "event.h"
58
59
60
61

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
62
static thread_type *_slave_thread_id;
63
static int slave_running = 0;
64
static volatile int update_settings = 0;
65
static volatile int update_all_mounts = 0;
66
static volatile unsigned int max_interval = 0;
67

68
relay_server *relay_free (relay_server *relay)
69
{
70
71
72
73
74
75
76
    relay_server *next = relay->next;
    DEBUG1("freeing relay %s", relay->localmount);
    if (relay->source)
       source_free_source (relay->source);
    xmlFree (relay->server);
    xmlFree (relay->mount);
    xmlFree (relay->localmount);
77
78
79
80
    if (relay->username)
        xmlFree (relay->username);
    if (relay->password)
        xmlFree (relay->password);
81
    free (relay);
82
    return next;
83
84
}

85

86
87
88
relay_server *relay_copy (relay_server *r)
{
    relay_server *copy = calloc (1, sizeof (relay_server));
Michael Smith's avatar
Michael Smith committed
89

90
    if (copy)
Michael Smith's avatar
Michael Smith committed
91
    {
92
93
94
        copy->server = (char *)xmlCharStrdup (r->server);
        copy->mount = (char *)xmlCharStrdup (r->mount);
        copy->localmount = (char *)xmlCharStrdup (r->localmount);
95
        if (r->username)
96
            copy->username = (char *)xmlCharStrdup (r->username);
97
        if (r->password)
98
            copy->password = (char *)xmlCharStrdup (r->password);
99
100
        copy->port = r->port;
        copy->mp3metadata = r->mp3metadata;
101
        copy->on_demand = r->on_demand;
Michael Smith's avatar
Michael Smith committed
102
    }
103
104
    return copy;
}
105

106

107
/* force a recheck of the relays. This will recheck the master server if
108
 * this is a slave and rebuild all mountpoints in the stats tree
109
 */
110
void slave_update_all_mounts (void)
111
{
112
    max_interval = 0;
113
    update_all_mounts = 1;
114
    update_settings = 1;
115
116
}

117
118
119

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
120
 */
121
void slave_rebuild_mounts (void)
122
{
123
    update_settings = 1;
124
125
}

126
127

void slave_initialize(void)
Michael Smith's avatar
Michael Smith committed
128
{
129
130
    if (slave_running)
        return;
Michael Smith's avatar
Michael Smith committed
131

132
    slave_running = 1;
133
    max_interval = 0;
134
135
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
136

Michael Smith's avatar
Michael Smith committed
137

138
139
140
void slave_shutdown(void)
{
    if (!slave_running)
141
        return;
142
    slave_running = 0;
143
    DEBUG0 ("waiting for slave thread");
144
145
146
147
    thread_join (_slave_thread_id);
}


148
149
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
150
 */
151
static client_t *open_relay_connection (relay_server *relay)
152
{
153
    int redirects = 0;
154
155
    char *server_id = NULL;
    ice_config_t *config;
156
157
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
158
159
160
161
    char *server = strdup (relay->server);
    char *mount = strdup (relay->mount);
    int port = relay->port;
    char *auth_header;
162
163
    char header[4096];

164
165
166
167
    config = config_get_config ();
    server_id = strdup (config->server_id);
    config_release_config ();

168
169
    /* build any authentication header before connecting */
    if (relay->username && relay->password)
170
    {
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
        char *esc_authorisation;
        unsigned len = strlen(relay->username) + strlen(relay->password) + 2;

        auth_header = malloc (len);
        snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
        esc_authorisation = util_base64_encode(auth_header);
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
186

187
188
189
190
191
192
193
    while (redirects < 10)
    {
        sock_t streamsock;

        INFO2 ("connecting to %s:%d", server, port);

        streamsock = sock_connect_wto (server, port, 10);
194
195
        if (streamsock == SOCK_ERROR)
        {
196
            WARN2 ("Failed to connect to %s:%d", server, port);
197
            break;
Michael Smith's avatar
Michael Smith committed
198
        }
199
        con = connection_create (streamsock, -1, strdup (server));
200

201
202
203
204
205
206
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
207
                "User-Agent: %s\r\n"
208
                "%s"
209
                "%s"
210
                "\r\n",
211
                mount,
212
                server_id,
213
214
                relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
                auth_header);
215
        memset (header, 0, sizeof(header));
216
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
217
        {
218
            ERROR4 ("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount);
219
220
221
222
223
224
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
        if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
        {
225
226
            ERROR4("Error parsing relay request for %s (%s:%d%s)", relay->localmount,
                    server, port, mount);
227
228
            break;
        }
229
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
230
        {
231
232
233
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
234

235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
            uri = httpp_getvar (parser, "location");
            INFO1 ("redirect received %s", uri);
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
256
257
258
            con = NULL;
            parser = NULL;
        }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
                ERROR2("Error from relay request: %s (%s)", relay->localmount,
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
            sock_set_blocking (streamsock, SOCK_NONBLOCK);
            client_set_queue (client, NULL);
            free (server);
            free (mount);
284
            free (server_id);
285
286
287
288
289
290
291
292
293
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
294
    free (server_id);
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
    relay_server *relay = arg;
    source_t *src = relay->source;
    client_t *client;

    INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
    do
    {
        client = open_relay_connection (relay);

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
324
325

        if (connection_complete_source (src, 0) < 0)
326
        {
327
328
329
330
            INFO0("Failed to complete source initialisation");
            client_destroy (client);
            src->client = NULL;
            continue;
331
        }
332
        stats_event_inc(NULL, "source_relay_connections");
333
        stats_event (relay->localmount, "source_ip", client->con->ip);
334

335
336
        source_main (relay->source);

337
338
339
340
341
        if (relay->on_demand == 0)
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
            yp_remove (relay->localmount);
            relay->source->yp_public = -1;
342
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
343
            slave_update_all_mounts();
344
345
        }

346
        /* we've finished, now get cleaned up */
347
        relay->cleanup = 1;
348
        slave_rebuild_mounts();
349
350

        return NULL;
351
    } while (0);  /* TODO allow looping through multiple servers */
352

353
354
355
356
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

357
        DEBUG1 ("failed relay, fallback to %s", relay->source->fallback_mount);
358
359
360
361
362
363
364
365
366
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (relay->source->fallback_mount);

        if (fallback_source != NULL)
            source_move_clients (relay->source, fallback_source);

        avl_tree_unlock (global.source_tree);
    }

367
    source_clear_source (relay->source);
368

369
370
    /* cleanup relay, but prevent this relay from starting up again too soon */
    relay->start = time(NULL) + max_interval;
371
372
373
    relay->cleanup = 1;

    return NULL;
374
375
376
377
378
379
380
381
}


/* wrapper for starting the provided relay stream */
static void check_relay_stream (relay_server *relay)
{
    if (relay->source == NULL)
    {
382
383
384
385
386
387
        if (relay->localmount[0] != '/')
        {
            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
                    relay->localmount);
            return;
        }
388
389
        /* new relay, reserve the name */
        relay->source = source_reserve (relay->localmount);
390
        if (relay->source)
391
        {
392
            DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
393
394
            if (relay->on_demand)
                slave_update_all_mounts();
395
        }
396
397
        else
            WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
398
    }
399
    do
400
    {
401
        source_t *source = relay->source;
402
403
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
404
            break;
405
        /* check if an inactive on-demand relay has a fallback that has listeners */
406
        if (relay->on_demand && source->on_demand_req == 0)
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
        {
            relay->source->on_demand = relay->on_demand;

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
                   DEBUG2 ("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

426
        relay->start = time(NULL) + 5;
427
        relay->running = 1;
428
429
430
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
431

432
    } while (0);
433
    /* the relay thread may of shut down itself */
434
    if (relay->cleanup)
435
    {
436
437
438
439
440
441
        if (relay->thread)
        {
            DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
            thread_join (relay->thread);
            relay->thread = NULL;
        }
442
443
        relay->cleanup = 0;
        relay->running = 0;
444

445
        if (relay->on_demand && relay->source)
446
447
448
449
450
451
452
        {
            ice_config_t *config = config_get_config ();
            mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
            stats_event (relay->localmount, "listeners", "0");
        }
Michael Smith's avatar
Michael Smith committed
453
    }
454
455
}

Michael Smith's avatar
Michael Smith committed
456

457
458
459
460
461
462
463
464
465
466
467
468
469
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
static int relay_has_changed (relay_server *new, relay_server *old)
{
    do
    {
        if (strcmp (new->mount, old->mount) != 0)
            break;
        if (strcmp (new->server, old->server) != 0)
            break;
        if (new->port != old->port)
            break;
470
471
        if (new->mp3metadata != old->mp3metadata)
            break;
472
473
        if (new->on_demand != old->on_demand)
            old->on_demand = new->on_demand;
474
475
476
477
478
479
        return 0;
    } while (0);
    return 1;
}


480
481
482
483
484
485
486
487
488
489
490
491
492
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
static relay_server *
update_relay_set (relay_server **current, relay_server *updated)
{
    relay_server *relay = updated;
    relay_server *existing_relay, **existing_p;
    relay_server *new_list = NULL;

    while (relay)
    {
493
494
495
496
497
498
499
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
            if (strcmp (relay->localmount, existing_relay->localmount) == 0)
500
501
                if (relay_has_changed (relay, existing_relay) == 0)
                    break;
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
            existing_p = &existing_relay->next;
            existing_relay = existing_relay->next;
        }
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
            existing_relay = relay_copy (relay);
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
        relay = relay->next;
517
    }
518
519
520
521
522
523
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
524
 * are separated and returned in a separate list
525
 */
526
static relay_server *
527
528
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
529
    relay_server *active_relays, *cleanup_relays;
530

531
    active_relays = update_relay_set (relay_list, new_relay_list);
532

533
534
535
536
537
538
539
540
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


541
542
static void relay_check_streams (relay_server *to_start,
        relay_server *to_free, int skip_timer)
543
544
545
546
{
    relay_server *relay;

    while (to_free)
547
    {
548
        if (to_free->source)
549
        {
550
551
552
553
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
                DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
554
                to_free->running = 0;
555
556
557
558
559
                to_free->source->running = 0;
                thread_join (to_free->thread);
            }
            else
                stats_event (to_free->localmount, NULL, NULL);
560
        }
561
562
563
564
565
566
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
567
568
        if (skip_timer)
            relay->start = 0;
569
570
        check_relay_stream (relay);
        relay = relay->next;
571
    }
Michael Smith's avatar
Michael Smith committed
572
573
}

574
575
576
577
578

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
579
    sock_t mastersock;
580
    int ret = 0;
581
    char buf[256];
582
583
584
    do
    {
        char *authheader, *data;
585
        relay_server *new_relays = NULL, *cleanup_relays;
586
        int len, count = 1;
587
        int on_demand;
588

589
        username = strdup (config->master_username);
590
591
        if (config->master_password)
            password = strdup (config->master_password);
Michael Smith's avatar
Michael Smith committed
592

593
594
        if (config->master_server)
            master = strdup (config->master_server);
Michael Smith's avatar
Michael Smith committed
595

596
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
597

598
599
        if (password == NULL || master == NULL || port == 0)
            break;
600
        on_demand = config->on_demand;
601
602
603
        ret = 1;
        config_release_config();
        mastersock = sock_connect_wto (master, port, 0);
604

605
606
607
608
609
        if (mastersock == SOCK_ERROR)
        {
            WARN0("Relay slave failed to contact master server to fetch stream list");
            break;
        }
Michael Smith's avatar
Michael Smith committed
610

611
612
613
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
614
615
616
617
618
619
620
621
        data = util_base64_encode(authheader);
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

622
623
624
625
626
627
628
629
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
                strncmp (buf, "HTTP/1.0 200", 12) != 0)
        {
            sock_close (mastersock);
            WARN0 ("Master rejected streamlist request");
            break;
        }

630
631
632
633
634
635
636
637
638
639
640
641
642
643
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            relay_server *r;
            if (!strlen(buf))
                continue;
            DEBUG2 ("read %d from master \"%s\"", count++, buf);
            r = calloc (1, sizeof (relay_server));
            if (r)
            {
644
                r->server = (char *)xmlCharStrdup (master);
645
                r->port = port;
646
647
                r->mount = (char *)xmlCharStrdup (buf);
                r->localmount = (char *)xmlCharStrdup (buf);
648
                r->mp3metadata = 1;
649
                r->on_demand = on_demand;
650
651
                r->next = new_relays;
                new_relays = r;
652
            }
Michael Smith's avatar
Michael Smith committed
653
        }
654
655
        sock_close (mastersock);

656
657
658
        thread_mutex_lock (&(config_locks()->relay_lock));
        cleanup_relays = update_relays (&global.master_relays, new_relays);
        
659
660
        relay_check_streams (global.master_relays, cleanup_relays, 0);
        relay_check_streams (NULL, new_relays, 0);
661
662
663

        thread_mutex_unlock (&(config_locks()->relay_lock));

664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
680
    unsigned int interval = 0;
Michael Smith's avatar
Michael Smith committed
681

682
    update_settings = 0;
683
    update_all_mounts = 0;
684

Karl Heyes's avatar
Karl Heyes committed
685
686
687
    config = config_get_config();
    stats_global (config);
    config_release_config();
688
    source_recheck_mounts (1);
689

690
    while (1)
691
    {
692
693
        relay_server *cleanup_relays = NULL;
        int skip_timer = 0;
694

695
696
697
698
699
700
701
        /* re-read xml file if requested */
        if (global . schedule_config_reread)
        {
            event_config_read (NULL);
            global . schedule_config_reread = 0;
        }

702
        thread_sleep (1000000);
703
704
        if (slave_running == 0)
            break;
705
706

        ++interval;
707

708
709
710
711
712
        /* only update relays lists when required */
        if (max_interval <= interval)
        {
            DEBUG0 ("checking master stream list");
            config = config_get_config();
Michael Smith's avatar
Michael Smith committed
713

714
715
            if (max_interval == 0)
                skip_timer = 1;
716
717
            interval = 0;
            max_interval = config->master_update_interval;
Michael Smith's avatar
Michael Smith committed
718

719
720
721
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
722

723
            thread_mutex_lock (&(config_locks()->relay_lock));
724

725
            cleanup_relays = update_relays (&global.relays, config->relay);
726

727
728
729
730
            config_release_config();
        }
        else
            thread_mutex_lock (&(config_locks()->relay_lock));
731
732
733
734
735

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

736
737
        if (update_settings)
        {
738
            source_recheck_mounts (update_all_mounts);
739
            update_settings = 0;
740
            update_all_mounts = 0;
741
        }
742
    }
743
    INFO0 ("shutting down current relays");
744
745
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
746

747
748
    INFO0 ("Slave thread shutdown complete");

749
    return NULL;
750
}
Michael Smith's avatar
Michael Smith committed
751