slave.c 28.3 KB
Newer Older
1
2
3
4
5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7
8
9
10
11
12
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14
15
16
17
18
19
20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21
22
23
24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25
26
27
28
29
30
31
32
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#endif

37
#include "compat.h"
38

39
#include <libxml/uri.h>
Marvin Scholz's avatar
Marvin Scholz committed
40
41
42
43
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/net/sock.h"
#include "common/httpp/httpp.h"
44

45
#include "slave.h"
46
#include "cfgfile.h"
47
48
49
50
51
52
53
54
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
Michael Smith's avatar
Michael Smith committed
55
#include "format.h"
56
57
58

#define CATMODULE "slave"

59
60
61
62
63
64
65
66
67
68
struct relay_tag {
    relay_config_t *config;
    source_t *source;
    int running;
    int cleanup;
    time_t start;
    thread_type *thread;
    relay_t *next;
};

69
static void *_slave_thread(void *arg);
70
static thread_type *_slave_thread_id;
71
static int slave_running = 0;
72
static volatile int update_settings = 0;
73
static volatile int update_all_mounts = 0;
74
static volatile unsigned int max_interval = 0;
75
static mutex_t _slave_mutex; // protects slave_running, update_settings, update_all_mounts, max_interval
76

77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
static inline void relay_config_upstream_free (relay_config_upstream_t *upstream)
{
    if (upstream->server)
        xmlFree(upstream->server);
    if (upstream->mount)
        xmlFree(upstream->mount);
    if (upstream->username)
        xmlFree(upstream->username);
    if (upstream->password)
        xmlFree(upstream->password);
}

void relay_config_free (relay_config_t *relay)
{
    size_t i;

    ICECAST_LOG_DEBUG("freeing relay config for %s", relay->localmount);

    for (i = 0; i < relay->upstreams; i++) {
        relay_config_upstream_free(&(relay->upstream[i]));
    }

    relay_config_upstream_free(&(relay->upstream_default));

    xmlFree(relay->localmount);
    free(relay->upstream);
    free(relay);
}

relay_t *relay_free (relay_t *relay)
107
{
108
109
110
111
    relay_t *next = relay->next;

    ICECAST_LOG_DEBUG("freeing relay %s", relay->config->localmount);

112
113
    if (relay->source)
       source_free_source (relay->source);
114
115
116
117

    relay_config_free(relay->config);

    free(relay);
118
    return next;
119
120
}

121

122
static inline void relay_config_upstream_copy(relay_config_upstream_t *dst, const relay_config_upstream_t *src)
123
{
124
125
    dst->server = (char *)xmlCharStrdup(src->server);
    dst->mount = (char *)xmlCharStrdup(src->mount);
Michael Smith's avatar
Michael Smith committed
126

127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
    if (src->username)
        dst->username = (char *)xmlCharStrdup(src->username);
    if (src->password)
        dst->password = (char *)xmlCharStrdup(src->password);

    dst->port = src->port;

    dst->mp3metadata = src->mp3metadata;
}

static inline relay_config_t *relay_config_copy (relay_config_t *r)
{
    relay_config_t *copy = calloc (1, sizeof (relay_config_t));
    relay_config_upstream_t *u = NULL;
    size_t i;

    if (r->upstreams) {
        u = calloc(r->upstreams, sizeof(relay_config_upstream_t));
        if (!u) {
            free(copy);
            return NULL;
        }
    }

    if (!copy) {
        free(u);
        return NULL;
Michael Smith's avatar
Michael Smith committed
154
    }
155
156

    copy->upstream = u;
157
    copy->upstreams = r->upstreams;
158
159
160
161
162
163
164

    copy->localmount = (char *)xmlCharStrdup(r->localmount);
    copy->on_demand = r->on_demand;

    relay_config_upstream_copy(&(copy->upstream_default), &(r->upstream_default));

    for (i = 0; i < r->upstreams; i++)
Philipp Schafft's avatar
Philipp Schafft committed
165
        relay_config_upstream_copy(&(copy->upstream[i]), &(r->upstream[i]));
166
167


168
169
    return copy;
}
170

171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
static inline relay_t *relay_new(relay_config_t *config)
{
    relay_t *r = calloc(1, sizeof(*r));

    if (!r)
        return NULL;

    r->config = relay_config_copy(config);
    if (!r->config) {
        free(r);
        return NULL;
    }

    return r;
}
186

187
/* force a recheck of the relays. This will recheck the master server if
188
 * this is a slave and rebuild all mountpoints in the stats tree
189
 */
190
void slave_update_all_mounts(void)
191
{
192
    thread_mutex_lock(&_slave_mutex);
193
    max_interval = 0;
194
    update_all_mounts = 1;
195
    update_settings = 1;
196
    thread_mutex_unlock(&_slave_mutex);
197
198
}

199
200
201

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
202
 */
203
void slave_rebuild_mounts(void)
204
{
205
    thread_mutex_lock(&_slave_mutex);
206
    update_settings = 1;
207
    thread_mutex_unlock(&_slave_mutex);
208
209
}

210
211

void slave_initialize(void)
Michael Smith's avatar
Michael Smith committed
212
{
213
214
    if (slave_running)
        return;
Michael Smith's avatar
Michael Smith committed
215

216
    slave_running = 1;
217
    max_interval = 0;
218
    thread_mutex_create (&_slave_mutex);
219
220
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
221

Michael Smith's avatar
Michael Smith committed
222

223
224
void slave_shutdown(void)
{
225
226
227
    thread_mutex_lock(&_slave_mutex);
    if (!slave_running) {
        thread_mutex_unlock(&_slave_mutex);
228
        return;
229
    }
230
    slave_running = 0;
231
232
    thread_mutex_unlock(&_slave_mutex);

233
    ICECAST_LOG_DEBUG("waiting for slave thread");
234
235
236
237
    thread_join (_slave_thread_id);
}


238
239
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
240
 */
241
242
#define _GET_UPSTREAM_SETTING(n) ((upstream && upstream->n) ? upstream->n : relay->config->upstream_default.n)
static client_t *open_relay_connection (relay_t *relay, relay_config_upstream_t *upstream)
243
{
244
    int redirects = 0;
245
246
    char *server_id = NULL;
    ice_config_t *config;
247
248
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
249
250
251
    char *server = strdup (_GET_UPSTREAM_SETTING(server));
    char *mount = strdup (_GET_UPSTREAM_SETTING(mount));
    int port = _GET_UPSTREAM_SETTING(port);
252
    char *auth_header;
253
254
    char header[4096];

255
256
257
258
    config = config_get_config ();
    server_id = strdup (config->server_id);
    config_release_config ();

259
    /* build any authentication header before connecting */
260
    if (_GET_UPSTREAM_SETTING(username) && _GET_UPSTREAM_SETTING(password))
261
    {
262
        char *esc_authorisation;
263
        unsigned len = strlen(_GET_UPSTREAM_SETTING(username)) + strlen(_GET_UPSTREAM_SETTING(password)) + 2;
264
265

        auth_header = malloc (len);
266
        snprintf (auth_header, len, "%s:%s", _GET_UPSTREAM_SETTING(username), _GET_UPSTREAM_SETTING(password));
267
        esc_authorisation = util_base64_encode(auth_header, len);
268
269
270
271
272
273
274
275
276
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
277

278
279
280
281
    while (redirects < 10)
    {
        sock_t streamsock;

282
        ICECAST_LOG_INFO("connecting to %s:%d", server, port);
283

284
        streamsock = sock_connect_wto_bind (server, port, _GET_UPSTREAM_SETTING(bind), 10);
285
286
        if (streamsock == SOCK_ERROR)
        {
287
            ICECAST_LOG_WARN("Failed to connect to %s:%d", server, port);
288
            break;
Michael Smith's avatar
Michael Smith committed
289
        }
290
        con = connection_create(streamsock, NULL, NULL, strdup(server));
291

292
293
294
295
296
297
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
298
                "User-Agent: %s\r\n"
Karl Heyes's avatar
Karl Heyes committed
299
                "Host: %s\r\n"
300
                "%s"
301
                "%s"
302
                "\r\n",
303
                mount,
304
                server_id,
Karl Heyes's avatar
Karl Heyes committed
305
                server,
306
                _GET_UPSTREAM_SETTING(mp3metadata) ? "Icy-MetaData: 1\r\n" : "",
307
                auth_header);
308
        memset (header, 0, sizeof(header));
309
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
310
        {
311
            ICECAST_LOG_ERROR("Header read failed for %s (%s:%d%s)", relay->config->localmount, server, port, mount);
312
313
314
315
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
316
        if (! httpp_parse_response (parser, header, strlen(header), relay->config->localmount))
317
        {
318
            ICECAST_LOG_ERROR("Error parsing relay request for %s (%s:%d%s)", relay->config->localmount,
319
                    server, port, mount);
320
321
            break;
        }
322
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
323
        {
324
325
326
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
327

328
            uri = httpp_getvar (parser, "location");
329
            ICECAST_LOG_INFO("redirect received %s", uri);
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
349
350
351
            con = NULL;
            parser = NULL;
        }
352
353
354
355
356
357
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
358
                ICECAST_LOG_ERROR("Error from relay request: %s (%s)", relay->config->localmount,
359
360
361
362
363
364
365
366
367
368
369
370
371
372
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
373
            sock_set_blocking (streamsock, 0);
374
            client_set_queue (client, NULL);
375
            client_complete(client);
376
377
            free (server);
            free (mount);
378
            free (server_id);
379
380
381
382
383
384
385
386
387
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
388
    free (server_id);
389
390
391
392
393
394
395
396
397
398
399
400
401
402
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
403
    relay_t *relay = arg;
404
405
406
    source_t *src = relay->source;
    client_t *client;

407
    ICECAST_LOG_INFO("Starting relayed source at mountpoint \"%s\"", relay->config->localmount);
408
409
    do
    {
410
411
412
413
414
415
416
417
418
419
420
421
422
423
        size_t i;

        for (i = 0; i < relay->config->upstreams; i++) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\", trying upstream #%zu", relay->config->localmount, i);
            client = open_relay_connection(relay, &(relay->config->upstream[i]));
            if (client)
                break;
        }

        /* if we have no upstreams defined, use the default upstream */
        if (!relay->config->upstreams) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\" with no upstreams trying upstream default", relay->config->localmount);
            client = open_relay_connection(relay, NULL);
        }
424
425
426
427
428
429
430

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
431
432

        if (connection_complete_source (src, 0) < 0)
433
        {
434
            ICECAST_LOG_INFO("Failed to complete source initialisation");
435
436
437
            client_destroy (client);
            src->client = NULL;
            continue;
438
        }
439
        stats_event_inc(NULL, "source_relay_connections");
440
        stats_event (relay->config->localmount, "source_ip", client->con->ip);
441

442
443
        source_main (relay->source);

444
        if (relay->config->on_demand == 0)
445
446
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
447
            yp_remove (relay->config->localmount);
448
            relay->source->yp_public = -1;
449
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
450
            slave_update_all_mounts();
451
452
        }

453
        /* we've finished, now get cleaned up */
454
        relay->cleanup = 1;
455
        slave_rebuild_mounts();
456
457

        return NULL;
458
    } while (0); /* TODO allow looping through multiple servers */
459

460
461
462
463
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

464
        ICECAST_LOG_DEBUG("failed relay, fallback to %s", relay->source->fallback_mount);
465
        avl_tree_rlock(global.source_tree);
466
        fallback_source = source_find_mount(relay->source->fallback_mount);
467
468

        if (fallback_source != NULL)
469
            source_move_clients(relay->source, fallback_source);
470

471
        avl_tree_unlock(global.source_tree);
472
473
    }

474
    source_clear_source(relay->source);
475

476
    /* cleanup relay, but prevent this relay from starting up again too soon */
477
478
    thread_mutex_lock(&_slave_mutex);
    thread_mutex_lock(&(config_locks()->relay_lock));
Karl Heyes's avatar
Karl Heyes committed
479
    relay->source->on_demand = 0;
480
    relay->start = time(NULL) + max_interval;
481
    relay->cleanup = 1;
482
483
    thread_mutex_unlock(&(config_locks()->relay_lock));
    thread_mutex_unlock(&_slave_mutex);
484
485

    return NULL;
486
487
488
489
}


/* wrapper for starting the provided relay stream */
490
static void check_relay_stream (relay_t *relay)
491
492
493
{
    if (relay->source == NULL)
    {
494
        if (relay->config->localmount[0] != '/')
495
        {
496
            ICECAST_LOG_WARN("relay mountpoint \"%s\" does not start with /, skipping",
497
                    relay->config->localmount);
498
499
            return;
        }
500
        /* new relay, reserve the name */
501
        relay->source = source_reserve (relay->config->localmount);
502
        if (relay->source)
503
        {
504
505
            ICECAST_LOG_DEBUG("Adding relay source at mountpoint \"%s\"", relay->config->localmount);
            if (relay->config->on_demand)
Karl Heyes's avatar
Karl Heyes committed
506
507
            {
                ice_config_t *config = config_get_config ();
508
509
                mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
                relay->source->on_demand = relay->config->on_demand;
Karl Heyes's avatar
Karl Heyes committed
510
511
512
                if (mountinfo == NULL)
                    source_update_settings (config, relay->source, mountinfo);
                config_release_config ();
513
                stats_event (relay->config->localmount, "listeners", "0");
514
                slave_update_all_mounts();
Karl Heyes's avatar
Karl Heyes committed
515
            }
516
        }
517
        else
518
519
520
        {
            if (relay->start == 0)
            {
521
                ICECAST_LOG_WARN("new relay but source \"%s\" already exists", relay->config->localmount);
522
523
524
525
                relay->start = 1;
            }
            return;
        }
526
    }
527
    do
528
    {
529
        source_t *source = relay->source;
530
531
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
532
            break;
533
        /* check if an inactive on-demand relay has a fallback that has listeners */
534
        if (relay->config->on_demand && source->on_demand_req == 0)
535
        {
536
            relay->source->on_demand = relay->config->on_demand;
537
538
539
540
541
542
543
544

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
545
                   ICECAST_LOG_DEBUG("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
546
547
548
549
550
551
552
553
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

554
        relay->start = time(NULL) + 5;
555
        relay->running = 1;
556
557
558
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
559

560
    } while (0);
561
    /* the relay thread may of shut down itself */
562
    if (relay->cleanup)
563
    {
564
565
        if (relay->thread)
        {
566
            ICECAST_LOG_DEBUG("waiting for relay thread for \"%s\"", relay->config->localmount);
567
568
569
            thread_join (relay->thread);
            relay->thread = NULL;
        }
570
571
        relay->cleanup = 0;
        relay->running = 0;
572

573
        if (relay->config->on_demand && relay->source)
574
575
        {
            ice_config_t *config = config_get_config ();
576
            mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
577
578
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
579
            stats_event (relay->config->localmount, "listeners", "0");
580
        }
Michael Smith's avatar
Michael Smith committed
581
    }
582
583
}

Michael Smith's avatar
Michael Smith committed
584

585
586
587
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
588
589
590
#define _EQ_STR(a,b) (((a) == (b)) || ((a) != NULL && (b) != NULL && strcmp((a), (b)) == 0))
#define _EQ_ATTR(x) (_EQ_STR((new->x), (old->x)))
static int relay_has_changed_upstream(const relay_config_upstream_t *new, const relay_config_upstream_t *old)
591
{
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
    if (new->mp3metadata != old->mp3metadata)
        return 1;

    if (!_EQ_ATTR(server) || new->port != old->port)
        return 1;

    if (!_EQ_ATTR(mount))
        return 1;

/* NOTE: We currently do not consider this a relevant change. Why?
    if (!_EQ_ATTR(username) || !_EQ_ATTR(password))
        return 1;

    if (!_EQ_ATTR(bind))
        return 1;
*/

    return 0;
}

static int relay_has_changed (const relay_config_t *new, relay_config_t *old)
{
    size_t i;

    /* This is not fully true: If more upstreams has been added there is no reason
     * to restart the relay. However for now we ignore this case. TODO: Change this.
     */
    if (new->upstreams != old->upstreams)
        return 1;

    for (i = 0; i < new->upstreams; i++) {
        if (relay_has_changed_upstream(&(new->upstream[i]), &(old->upstream[i])))
            return 1;
    }

    if (relay_has_changed_upstream(&(new->upstream_default), &(old->upstream_default)))
        return 1;

    /* Why do we do this here? */
    old->on_demand = new->on_demand;

    return 0;
634
635
636
}


637
638
639
640
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
641
642
static relay_t *
update_relay_set(relay_t **current, relay_config_t **updated, size_t updated_length)
643
{
644
645
646
647
648
649
650
    relay_config_t *relay;
    relay_t *existing_relay, **existing_p;
    relay_t *new_list = NULL;
    size_t i;

    for (i = 0; i < updated_length; i++) {
        relay = updated[i];
651

652
653
654
655
656
657
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
658
659
            if (strcmp(relay->localmount, existing_relay->config->localmount) == 0)
                if (relay_has_changed(relay, existing_relay->config) == 0)
660
                    break;
661
            existing_p = &existing_relay->next;
662

663
664
            existing_relay = existing_relay->next;
        }
665
666


667
668
669
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
670
            existing_relay = relay_new(relay);
671
672
673
674
675
676
677
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
678
    }
679

680
681
682
683
684
685
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
686
 * are separated and returned in a separate list
687
 */
688
689
static relay_t *
update_relays (relay_t **relay_list, relay_config_t **new_relay_list, size_t new_relay_list_length)
690
{
691
    relay_t *active_relays, *cleanup_relays;
692

693
    active_relays = update_relay_set(relay_list, new_relay_list, new_relay_list_length);
694

695
696
697
698
699
700
701
702
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


703
704
static void relay_check_streams (relay_t *to_start,
        relay_t *to_free, int skip_timer)
705
{
706
    relay_t *relay;
707
708

    while (to_free)
709
    {
710
        if (to_free->source)
711
        {
712
713
714
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
715
                ICECAST_LOG_DEBUG("source shutdown request on \"%s\"", to_free->config->localmount);
716
                to_free->running = 0;
717
718
719
720
                to_free->source->running = 0;
                thread_join (to_free->thread);
            }
            else
721
                stats_event (to_free->config->localmount, NULL, NULL);
722
        }
723
724
725
726
727
728
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
729
730
        if (skip_timer)
            relay->start = 0;
731
732
        check_relay_stream (relay);
        relay = relay->next;
733
    }
Michael Smith's avatar
Michael Smith committed
734
735
}

736
737
738
739
740

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
741
    sock_t mastersock;
742
    int ret = 0;
743
    char buf[256];
744
745
746
    do
    {
        char *authheader, *data;
747
748
749
        relay_t *cleanup_relays;
        relay_config_t **new_relays = NULL;
        size_t new_relays_length = 0;
750
        int len, count = 1;
751
        int on_demand;
752
        size_t i;
753

754
        username = strdup(config->master_username);
755
        if (config->master_password)
756
            password = strdup(config->master_password);
Michael Smith's avatar
Michael Smith committed
757

758
        if (config->master_server)
759
            master = strdup(config->master_server);
Michael Smith's avatar
Michael Smith committed
760

761
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
762

763
764
        if (password == NULL || master == NULL || port == 0)
            break;
765
        on_demand = config->on_demand;
766
767
        ret = 1;
        config_release_config();
768
        mastersock = sock_connect_wto(master, port, 10);
769

770
771
        if (mastersock == SOCK_ERROR)
        {
772
            ICECAST_LOG_WARN("Relay slave failed to contact master server to fetch stream list");
773
774
            break;
        }
Michael Smith's avatar
Michael Smith committed
775

776
777
778
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
779
        data = util_base64_encode(authheader, len);
780
781
782
783
784
785
786
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

787
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
788
                ((strncmp (buf, "HTTP/1.0 200", 12) != 0) && (strncmp (buf, "HTTP/1.1 200", 12) != 0)))
789
790
        {
            sock_close (mastersock);
791
            ICECAST_LOG_WARN("Master rejected streamlist request");
792
            break;
793
794
        } else {
            ICECAST_LOG_INFO("Master accepted streamlist request");
795
796
        }

797
798
799
800
801
802
803
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
804
805
806
            relay_config_t *c = NULL;
            relay_config_t **n;

807
808
            if (!strlen(buf))
                continue;
809
            ICECAST_LOG_DEBUG("read %d from master \"%s\"", count++, buf);
810
811
            xmlURIPtr parsed_uri = xmlParseURI(buf);
            if (parsed_uri == NULL) {
812
                ICECAST_LOG_DEBUG("Error while parsing line from master. Ignoring line.");
813
814
                continue;
            }
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834

            n = realloc(new_relays, sizeof(*new_relays)*(new_relays_length + 1));
            if (n) {
                new_relays = n;

                c = calloc(1, sizeof(*c));
                new_relays[new_relays_length++] = c;
            }

            if (c) {
                if (parsed_uri->server != NULL) {
                    c->upstream_default.server = strdup(parsed_uri->server);
                    if (parsed_uri->port == 0) {
                        c->upstream_default.port = 80;
                    } else {
                        c->upstream_default.port = parsed_uri->port;
                    }
                } else {
                    c->upstream_default.server = (char *)xmlCharStrdup (master);
                    c->upstream_default.port = port;
835
836
                }

837
838
839
840
841
                c->upstream_default.mount = strdup(parsed_uri->path);
                c->localmount = strdup(parsed_uri->path);
                c->upstream_default.mp3metadata = 1;
                c->on_demand = on_demand;
                ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", c->upstream_default.server, c->upstream_default.port, c->upstream_default.mount);
842
            }
843
            xmlFreeURI(parsed_uri);
Michael Smith's avatar
Michael Smith committed
844
        }
845
846
        sock_close (mastersock);

847
        thread_mutex_lock (&(config_locks()->relay_lock));
848
        cleanup_relays = update_relays (&global.master_relays, new_relays, new_relays_length);
849

850
        relay_check_streams (global.master_relays, cleanup_relays, 0);
851
852
853
854
855

        for (i = 0; i < new_relays_length; i++) {
            relay_config_free(new_relays[i]);
        }
        free(new_relays);
856
857
858

        thread_mutex_unlock (&(config_locks()->relay_lock));

859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
875
    unsigned int interval = 0;
Michael Smith's avatar
Michael Smith committed
876

877
878
    (void)arg;

879
    thread_mutex_lock(&_slave_mutex);
880
    update_settings = 0;
881
    update_all_mounts = 0;
882
    thread_mutex_unlock(&_slave_mutex);
883

Karl Heyes's avatar
Karl Heyes committed
884
    config = config_get_config();
885
    stats_global(config);
Karl Heyes's avatar
Karl Heyes committed
886
    config_release_config();
887
    source_recheck_mounts(1);
888

889
    while (1)
890
    {
891
        relay_t *cleanup_relays = NULL;
892
        int skip_timer = 0;
893

894
        /* re-read xml file if requested */
895
        global_lock();
896
897
        if (global.schedule_config_reread) {
            config_reread_config();
898
            global.schedule_config_reread = 0;
899
        }
900
        global_unlock();
901

902
        thread_sleep(1000000);
903
904
905
        thread_mutex_lock(&_slave_mutex);
        if (slave_running == 0) {
            thread_mutex_unlock(&_slave_mutex);
906
            break;
907
908
        }
        thread_mutex_unlock(&_slave_mutex);
909
910

        ++interval;
911

912
        /* only update relays lists when required */
913
        thread_mutex_lock(&_slave_mutex);
914
915
        if (max_interval <= interval)
        {
916
            ICECAST_LOG_DEBUG("checking master stream list");
917
            config = config_get_config();
Michael Smith's avatar
Michael Smith committed
918

919
920
            if (max_interval == 0)
                skip_timer = 1;
921
922
            interval = 0;
            max_interval = config->master_update_interval;
923
            thread_mutex_unlock(&_slave_mutex);
Michael Smith's avatar
Michael Smith committed
924

925
926
927
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
928

929
            thread_mutex_lock (&(config_locks()->relay_lock));
930

931
            cleanup_relays = update_relays(&global.relays, config->relay, config->relay_length);
932

933
934
935
            config_release_config();
        }
        else
936
937
        {
            thread_mutex_unlock(&_slave_mutex);
938
            thread_mutex_lock (&(config_locks()->relay_lock));
939
        }
940
941
942
943
944

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

945
        thread_mutex_lock(&_slave_mutex);
946
947
        if (update_settings)
        {
948
            source_recheck_mounts (update_all_mounts);
949
            update_settings = 0;
950
            update_all_mounts = 0;
951
        }
952
        thread_mutex_unlock(&_slave_mutex);
953
    }
954
    ICECAST_LOG_INFO("shutting down current relays");
955
956
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
957

958
    ICECAST_LOG_INFO("Slave thread shutdown complete");
959

960
    return NULL;
961
}
Michael Smith's avatar
Michael Smith committed
962