slave.c 21.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14
15
16
17
18
19
20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21
22
23
24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

42
#include "compat.h"
43

Karl Heyes's avatar
Karl Heyes committed
44
45
46
47
#include "thread/thread.h"
#include "avl/avl.h"
#include "net/sock.h"
#include "httpp/httpp.h"
48

49
#include "cfgfile.h"
50
51
52
53
54
55
56
57
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
Michael Smith's avatar
Michael Smith committed
58
#include "format.h"
59
#include "event.h"
60
61
62
63

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
64
static thread_type *_slave_thread_id;
65
static int slave_running = 0;
66
67
static volatile int update_settings = 0;
static volatile int update_streams = 0;
68
static volatile unsigned int max_interval = 0;
69

70
relay_server *relay_free (relay_server *relay)
71
{
72
73
74
75
76
77
78
    relay_server *next = relay->next;
    DEBUG1("freeing relay %s", relay->localmount);
    if (relay->source)
       source_free_source (relay->source);
    xmlFree (relay->server);
    xmlFree (relay->mount);
    xmlFree (relay->localmount);
79
80
81
82
    if (relay->username)
        xmlFree (relay->username);
    if (relay->password)
        xmlFree (relay->password);
83
    free (relay);
84
    return next;
85
86
}

87

88
89
90
relay_server *relay_copy (relay_server *r)
{
    relay_server *copy = calloc (1, sizeof (relay_server));
Michael Smith's avatar
Michael Smith committed
91

92
    if (copy)
Michael Smith's avatar
Michael Smith committed
93
    {
94
95
96
        copy->server = xmlStrdup (r->server);
        copy->mount = xmlStrdup (r->mount);
        copy->localmount = xmlStrdup (r->localmount);
97
98
99
100
        if (r->username)
            copy->username = xmlStrdup (r->username);
        if (r->password)
            copy->password = xmlStrdup (r->password);
101
102
        copy->port = r->port;
        copy->mp3metadata = r->mp3metadata;
103
        copy->on_demand = r->on_demand;
Michael Smith's avatar
Michael Smith committed
104
    }
105
106
    return copy;
}
107

108

109
110
111
/* force a recheck of the relays. This will recheck the master server if
 * a this is a slave.
 */
112
void slave_recheck_all (void)
113
{
114
    max_interval = 0;
115
    update_streams = 1;
116
    update_settings = 1;
117
118
}

119
120
121

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
122
 */
123
void slave_rebuild_mounts (void)
124
{
125
    update_settings = 1;
126
127
}

128
129

void slave_initialize(void)
Michael Smith's avatar
Michael Smith committed
130
{
131
132
    if (slave_running)
        return;
Michael Smith's avatar
Michael Smith committed
133

134
    slave_running = 1;
135
    max_interval = 0;
136
137
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
138

Michael Smith's avatar
Michael Smith committed
139

140
141
142
void slave_shutdown(void)
{
    if (!slave_running)
143
        return;
144
    slave_running = 0;
145
    DEBUG0 ("waiting for slave thread");
146
147
148
149
    thread_join (_slave_thread_id);
}


150
151
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
152
 */
153
static client_t *open_relay_connection (relay_server *relay)
154
{
155
    int redirects = 0;
156
157
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
158
159
160
161
    char *server = strdup (relay->server);
    char *mount = strdup (relay->mount);
    int port = relay->port;
    char *auth_header;
162
163
    char header[4096];

164
165
    /* build any authentication header before connecting */
    if (relay->username && relay->password)
166
    {
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
        char *esc_authorisation;
        unsigned len = strlen(relay->username) + strlen(relay->password) + 2;

        auth_header = malloc (len);
        snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
        esc_authorisation = util_base64_encode(auth_header);
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
182

183
184
185
186
187
188
189
    while (redirects < 10)
    {
        sock_t streamsock;

        INFO2 ("connecting to %s:%d", server, port);

        streamsock = sock_connect_wto (server, port, 10);
190
191
        if (streamsock == SOCK_ERROR)
        {
192
            WARN2 ("Failed to connect to %s:%d", server, port);
193
            break;
Michael Smith's avatar
Michael Smith committed
194
        }
195
        con = connection_create (streamsock, -1, strdup (server));
196

197
198
199
200
201
202
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
203
                "User-Agent: %s\r\n"
204
                "%s"
205
                "%s"
206
                "\r\n",
207
                mount,
208
                ICECAST_VERSION_STRING,
209
210
                relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
                auth_header);
211
        memset (header, 0, sizeof(header));
212
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
213
        {
214
            ERROR4 ("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount);
215
216
217
218
219
220
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
        if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
        {
221
222
            ERROR4("Error parsing relay request for %s (%s:%d%s)", relay->localmount,
                    server, port, mount);
223
224
            break;
        }
225
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
226
        {
227
228
229
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
230

231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
            uri = httpp_getvar (parser, "location");
            INFO1 ("redirect received %s", uri);
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
252
253
254
            con = NULL;
            parser = NULL;
        }
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
                ERROR2("Error from relay request: %s (%s)", relay->localmount,
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
            sock_set_blocking (streamsock, SOCK_NONBLOCK);
            client_set_queue (client, NULL);
            free (server);
            free (mount);
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
    relay_server *relay = arg;
    source_t *src = relay->source;
    client_t *client;

    INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
    do
    {
        client = open_relay_connection (relay);

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
318
319

        if (connection_complete_source (src, 0) < 0)
320
        {
321
322
323
324
            INFO0("Failed to complete source initialisation");
            client_destroy (client);
            src->client = NULL;
            continue;
325
        }
326
        stats_event_inc(NULL, "source_relay_connections");
327
        stats_event (relay->localmount, "source_ip", client->con->ip);
328

329
330
        source_main (relay->source);

331
332
333
334
335
        if (relay->on_demand == 0)
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
            yp_remove (relay->localmount);
            relay->source->yp_public = -1;
336
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
337
338
        }

339
        /* we've finished, now get cleaned up */
340
        relay->cleanup = 1;
341
        slave_rebuild_mounts();
342
343

        return NULL;
344
    } while (0);  /* TODO allow looping through multiple servers */
345

346
347
348
349
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

350
        DEBUG1 ("failed relay, fallback to %s", relay->source->fallback_mount);
351
352
353
354
355
356
357
358
359
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (relay->source->fallback_mount);

        if (fallback_source != NULL)
            source_move_clients (relay->source, fallback_source);

        avl_tree_unlock (global.source_tree);
    }

360
    source_clear_source (relay->source);
361

362
363
    /* cleanup relay, but prevent this relay from starting up again too soon */
    relay->start = time(NULL) + max_interval;
364
365
366
    relay->cleanup = 1;

    return NULL;
367
368
369
370
371
372
373
374
}


/* wrapper for starting the provided relay stream */
static void check_relay_stream (relay_server *relay)
{
    if (relay->source == NULL)
    {
375
376
377
378
379
380
        if (relay->localmount[0] != '/')
        {
            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
                    relay->localmount);
            return;
        }
381
382
        /* new relay, reserve the name */
        relay->source = source_reserve (relay->localmount);
383
        if (relay->source)
384
        {
385
            ice_config_t *config;
386
            DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
387
388
389
390
391
            config = config_get_config();
            stats_event_args (relay->localmount, "listenurl", "http://%s:%d%s",
                    config->hostname, config->port, relay->localmount);
            config_release_config();
            stats_event (relay->localmount, "listeners", "0");
392
393
            slave_rebuild_mounts();
        }
394
395
        else
            WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
396
    }
397
    do
398
    {
399
        source_t *source = relay->source;
400
401
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
402
            break;
403
        if (relay->on_demand && source->on_demand_req == 0)
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
        {
            relay->source->on_demand = relay->on_demand;

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
                   DEBUG2 ("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

423
        relay->start = time(NULL) + 5;
424
        relay->running = 1;
425
426
427
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
428

429
    } while (0);
430
    /* the relay thread may of shut down itself */
431
    if (relay->cleanup)
432
    {
433
434
435
436
437
438
        if (relay->thread)
        {
            DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
            thread_join (relay->thread);
            relay->thread = NULL;
        }
439
440
        relay->cleanup = 0;
        relay->running = 0;
441

442
        if (relay->on_demand && relay->source)
443
444
445
446
447
448
449
        {
            ice_config_t *config = config_get_config ();
            mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
            stats_event (relay->localmount, "listeners", "0");
        }
Michael Smith's avatar
Michael Smith committed
450
    }
451
452
}

Michael Smith's avatar
Michael Smith committed
453

454
455
456
457
458
459
460
461
462
463
464
465
466
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
static int relay_has_changed (relay_server *new, relay_server *old)
{
    do
    {
        if (strcmp (new->mount, old->mount) != 0)
            break;
        if (strcmp (new->server, old->server) != 0)
            break;
        if (new->port != old->port)
            break;
467
468
        if (new->mp3metadata != old->mp3metadata)
            break;
469
470
        if (new->on_demand != old->on_demand)
            old->on_demand = new->on_demand;
471
472
473
474
475
476
        return 0;
    } while (0);
    return 1;
}


477
478
479
480
481
482
483
484
485
486
487
488
489
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
static relay_server *
update_relay_set (relay_server **current, relay_server *updated)
{
    relay_server *relay = updated;
    relay_server *existing_relay, **existing_p;
    relay_server *new_list = NULL;

    while (relay)
    {
490
491
492
493
494
495
496
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
            if (strcmp (relay->localmount, existing_relay->localmount) == 0)
497
498
                if (relay_has_changed (relay, existing_relay) == 0)
                    break;
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
            existing_p = &existing_relay->next;
            existing_relay = existing_relay->next;
        }
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
            existing_relay = relay_copy (relay);
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
        relay = relay->next;
514
    }
515
516
517
518
519
520
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
521
 * are separated and returned in a separate list
522
 */
523
static relay_server *
524
525
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
526
    relay_server *active_relays, *cleanup_relays;
527

528
    active_relays = update_relay_set (relay_list, new_relay_list);
529

530
531
532
533
534
535
536
537
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


538
539
static void relay_check_streams (relay_server *to_start,
        relay_server *to_free, int skip_timer)
540
541
542
543
{
    relay_server *relay;

    while (to_free)
544
    {
545
        if (to_free->source)
546
        {
547
548
549
550
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
                DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
551
                to_free->running = 0;
552
553
554
555
556
                to_free->source->running = 0;
                thread_join (to_free->thread);
            }
            else
                stats_event (to_free->localmount, NULL, NULL);
557
        }
558
559
560
561
562
563
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
564
565
        if (skip_timer)
            relay->start = 0;
566
567
        check_relay_stream (relay);
        relay = relay->next;
568
    }
Michael Smith's avatar
Michael Smith committed
569
570
}

571
572
573
574
575

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
576
    sock_t mastersock;
577
    int ret = 0;
578
    char buf[256];
579
580
581
    do
    {
        char *authheader, *data;
582
        relay_server *new_relays = NULL, *cleanup_relays;
583
        int len, count = 1;
584
        int on_demand;
585

586
        username = strdup (config->master_username);
587
588
        if (config->master_password)
            password = strdup (config->master_password);
Michael Smith's avatar
Michael Smith committed
589

590
591
        if (config->master_server)
            master = strdup (config->master_server);
Michael Smith's avatar
Michael Smith committed
592

593
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
594

595
596
        if (password == NULL || master == NULL || port == 0)
            break;
597
        on_demand = config->on_demand;
598
599
600
        ret = 1;
        config_release_config();
        mastersock = sock_connect_wto (master, port, 0);
601

602
603
604
605
606
        if (mastersock == SOCK_ERROR)
        {
            WARN0("Relay slave failed to contact master server to fetch stream list");
            break;
        }
Michael Smith's avatar
Michael Smith committed
607

608
609
610
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
611
612
613
614
615
616
617
618
        data = util_base64_encode(authheader);
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

619
620
621
622
623
624
625
626
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
                strncmp (buf, "HTTP/1.0 200", 12) != 0)
        {
            sock_close (mastersock);
            WARN0 ("Master rejected streamlist request");
            break;
        }

627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            relay_server *r;
            if (!strlen(buf))
                continue;
            DEBUG2 ("read %d from master \"%s\"", count++, buf);
            r = calloc (1, sizeof (relay_server));
            if (r)
            {
                r->server = xmlStrdup (master);
                r->port = port;
                r->mount = xmlStrdup (buf);
                r->localmount = xmlStrdup (buf);
                r->mp3metadata = 1;
646
                r->on_demand = on_demand;
647
648
                r->next = new_relays;
                new_relays = r;
649
            }
Michael Smith's avatar
Michael Smith committed
650
        }
651
652
        sock_close (mastersock);

653
654
655
        thread_mutex_lock (&(config_locks()->relay_lock));
        cleanup_relays = update_relays (&global.master_relays, new_relays);
        
656
657
        relay_check_streams (global.master_relays, cleanup_relays, 0);
        relay_check_streams (NULL, new_relays, 0);
658
659
660

        thread_mutex_unlock (&(config_locks()->relay_lock));

661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
677
    unsigned int interval = 0;
Michael Smith's avatar
Michael Smith committed
678

679
680
681
    update_settings = 0;
    update_streams = 0;

Karl Heyes's avatar
Karl Heyes committed
682
683
684
    config = config_get_config();
    stats_global (config);
    config_release_config();
685
    source_recheck_mounts (1);
686

687
    while (1)
688
    {
689
690
        relay_server *cleanup_relays = NULL;
        int skip_timer = 0;
691

692
693
694
695
696
697
698
        /* re-read xml file if requested */
        if (global . schedule_config_reread)
        {
            event_config_read (NULL);
            global . schedule_config_reread = 0;
        }

699
        thread_sleep (1000000);
700
701
        if (slave_running == 0)
            break;
702
703

        ++interval;
704

705
706
707
708
709
        /* only update relays lists when required */
        if (max_interval <= interval)
        {
            DEBUG0 ("checking master stream list");
            config = config_get_config();
Michael Smith's avatar
Michael Smith committed
710

711
712
            if (max_interval == 0)
                skip_timer = 1;
713
714
            interval = 0;
            max_interval = config->master_update_interval;
Michael Smith's avatar
Michael Smith committed
715

716
717
718
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
719

720
            thread_mutex_lock (&(config_locks()->relay_lock));
721

722
            cleanup_relays = update_relays (&global.relays, config->relay);
723

724
725
726
727
            config_release_config();
        }
        else
            thread_mutex_lock (&(config_locks()->relay_lock));
728
729
730
731
732

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

733
734
        if (update_settings)
        {
735
            source_recheck_mounts (update_streams);
736
            update_settings = 0;
737
            update_streams = 0;
738
        }
739
    }
740
    INFO0 ("shutting down current relays");
741
742
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
743

744
745
    INFO0 ("Slave thread shutdown complete");

746
    return NULL;
747
}
Michael Smith's avatar
Michael Smith committed
748