slave.c 28.4 KB
Newer Older
1
2
3
4
5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7
8
9
10
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
11
 * Copyright 2012-2018, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
12
13
 */

14
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
15
16
17
18
19
20
21
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

22
23
24
25
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

26
27
28
29
30
31
32
33
34
35
36
37
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#endif

38
#include "compat.h"
39

40
#include <libxml/uri.h>
Marvin Scholz's avatar
Marvin Scholz committed
41
42
43
44
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/net/sock.h"
#include "common/httpp/httpp.h"
45

46
#include "slave.h"
47
#include "cfgfile.h"
48
49
50
51
52
53
54
55
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
Michael Smith's avatar
Michael Smith committed
56
#include "format.h"
57
58
59

#define CATMODULE "slave"

60
61
62
63
64
65
66
67
68
69
struct relay_tag {
    relay_config_t *config;
    source_t *source;
    int running;
    int cleanup;
    time_t start;
    thread_type *thread;
    relay_t *next;
};

70
static void *_slave_thread(void *arg);
71
static thread_type *_slave_thread_id;
72
static int slave_running = 0;
73
static volatile int update_settings = 0;
74
static volatile int update_all_mounts = 0;
75
static volatile unsigned int max_interval = 0;
76
static mutex_t _slave_mutex; // protects slave_running, update_settings, update_all_mounts, max_interval
77

78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
static inline void relay_config_upstream_free (relay_config_upstream_t *upstream)
{
    if (upstream->server)
        xmlFree(upstream->server);
    if (upstream->mount)
        xmlFree(upstream->mount);
    if (upstream->username)
        xmlFree(upstream->username);
    if (upstream->password)
        xmlFree(upstream->password);
}

void relay_config_free (relay_config_t *relay)
{
    size_t i;

    ICECAST_LOG_DEBUG("freeing relay config for %s", relay->localmount);

    for (i = 0; i < relay->upstreams; i++) {
        relay_config_upstream_free(&(relay->upstream[i]));
    }

    relay_config_upstream_free(&(relay->upstream_default));

    xmlFree(relay->localmount);
    free(relay->upstream);
    free(relay);
}

relay_t *relay_free (relay_t *relay)
108
{
109
110
111
112
    relay_t *next = relay->next;

    ICECAST_LOG_DEBUG("freeing relay %s", relay->config->localmount);

113
114
    if (relay->source)
       source_free_source (relay->source);
115
116
117
118

    relay_config_free(relay->config);

    free(relay);
119
    return next;
120
121
}

122

123
static inline void relay_config_upstream_copy(relay_config_upstream_t *dst, const relay_config_upstream_t *src)
124
{
125
126
    dst->server = (char *)xmlCharStrdup(src->server);
    dst->mount = (char *)xmlCharStrdup(src->mount);
Michael Smith's avatar
Michael Smith committed
127

128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
    if (src->username)
        dst->username = (char *)xmlCharStrdup(src->username);
    if (src->password)
        dst->password = (char *)xmlCharStrdup(src->password);

    dst->port = src->port;

    dst->mp3metadata = src->mp3metadata;
}

static inline relay_config_t *relay_config_copy (relay_config_t *r)
{
    relay_config_t *copy = calloc (1, sizeof (relay_config_t));
    relay_config_upstream_t *u = NULL;
    size_t i;

    if (r->upstreams) {
        u = calloc(r->upstreams, sizeof(relay_config_upstream_t));
        if (!u) {
            free(copy);
            return NULL;
        }
    }

    if (!copy) {
        free(u);
        return NULL;
Michael Smith's avatar
Michael Smith committed
155
    }
156
157

    copy->upstream = u;
158
    copy->upstreams = r->upstreams;
159
160
161
162
163
164
165

    copy->localmount = (char *)xmlCharStrdup(r->localmount);
    copy->on_demand = r->on_demand;

    relay_config_upstream_copy(&(copy->upstream_default), &(r->upstream_default));

    for (i = 0; i < r->upstreams; i++)
Philipp Schafft's avatar
Philipp Schafft committed
166
        relay_config_upstream_copy(&(copy->upstream[i]), &(r->upstream[i]));
167
168


169
170
    return copy;
}
171

172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
static inline relay_t *relay_new(relay_config_t *config)
{
    relay_t *r = calloc(1, sizeof(*r));

    if (!r)
        return NULL;

    r->config = relay_config_copy(config);
    if (!r->config) {
        free(r);
        return NULL;
    }

    return r;
}
187

188
/* force a recheck of the relays. This will recheck the master server if
189
 * this is a slave and rebuild all mountpoints in the stats tree
190
 */
191
void slave_update_all_mounts(void)
192
{
193
    thread_mutex_lock(&_slave_mutex);
194
    max_interval = 0;
195
    update_all_mounts = 1;
196
    update_settings = 1;
197
    thread_mutex_unlock(&_slave_mutex);
198
199
}

200
201
202

/* Request slave thread to check the relay list for changes and to
 * update the stats for the current streams.
203
 */
204
void slave_rebuild_mounts(void)
205
{
206
    thread_mutex_lock(&_slave_mutex);
207
    update_settings = 1;
208
    thread_mutex_unlock(&_slave_mutex);
209
210
}

211
212

void slave_initialize(void)
Michael Smith's avatar
Michael Smith committed
213
{
214
215
    if (slave_running)
        return;
Michael Smith's avatar
Michael Smith committed
216

217
    slave_running = 1;
218
    max_interval = 0;
219
    thread_mutex_create (&_slave_mutex);
220
221
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
222

Michael Smith's avatar
Michael Smith committed
223

224
225
void slave_shutdown(void)
{
226
227
228
    thread_mutex_lock(&_slave_mutex);
    if (!slave_running) {
        thread_mutex_unlock(&_slave_mutex);
229
        return;
230
    }
231
    slave_running = 0;
232
233
    thread_mutex_unlock(&_slave_mutex);

234
    ICECAST_LOG_DEBUG("waiting for slave thread");
235
236
237
238
    thread_join (_slave_thread_id);
}


239
240
/* Actually open the connection and do some http parsing, handle any 302
 * responses within here.
241
 */
242
243
#define _GET_UPSTREAM_SETTING(n) ((upstream && upstream->n) ? upstream->n : relay->config->upstream_default.n)
static client_t *open_relay_connection (relay_t *relay, relay_config_upstream_t *upstream)
244
{
245
    int redirects = 0;
246
247
    char *server_id = NULL;
    ice_config_t *config;
248
249
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
250
251
252
    char *server = strdup (_GET_UPSTREAM_SETTING(server));
    char *mount = strdup (_GET_UPSTREAM_SETTING(mount));
    int port = _GET_UPSTREAM_SETTING(port);
253
    char *auth_header;
254
255
    char header[4096];

256
257
258
259
    config = config_get_config ();
    server_id = strdup (config->server_id);
    config_release_config ();

260
    /* build any authentication header before connecting */
261
    if (_GET_UPSTREAM_SETTING(username) && _GET_UPSTREAM_SETTING(password))
262
    {
263
        char *esc_authorisation;
264
        unsigned len = strlen(_GET_UPSTREAM_SETTING(username)) + strlen(_GET_UPSTREAM_SETTING(password)) + 2;
265
266

        auth_header = malloc (len);
267
        snprintf (auth_header, len, "%s:%s", _GET_UPSTREAM_SETTING(username), _GET_UPSTREAM_SETTING(password));
268
        esc_authorisation = util_base64_encode(auth_header, len);
269
270
271
272
273
274
275
276
277
        free(auth_header);
        len = strlen (esc_authorisation) + 24;
        auth_header = malloc (len);
        snprintf (auth_header, len,
                "Authorization: Basic %s\r\n", esc_authorisation);
        free(esc_authorisation);
    }
    else
        auth_header = strdup ("");
278

279
280
281
282
    while (redirects < 10)
    {
        sock_t streamsock;

283
        ICECAST_LOG_INFO("connecting to %s:%d", server, port);
284

285
        streamsock = sock_connect_wto_bind (server, port, _GET_UPSTREAM_SETTING(bind), 10);
286
287
        if (streamsock == SOCK_ERROR)
        {
288
            ICECAST_LOG_WARN("Failed to connect to %s:%d", server, port);
289
            break;
Michael Smith's avatar
Michael Smith committed
290
        }
291
        con = connection_create(streamsock, NULL, NULL, strdup(server));
292

293
294
295
296
297
298
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
299
                "User-Agent: %s\r\n"
Karl Heyes's avatar
Karl Heyes committed
300
                "Host: %s\r\n"
301
                "%s"
302
                "%s"
303
                "\r\n",
304
                mount,
305
                server_id,
Karl Heyes's avatar
Karl Heyes committed
306
                server,
307
                _GET_UPSTREAM_SETTING(mp3metadata) ? "Icy-MetaData: 1\r\n" : "",
308
                auth_header);
309
        memset (header, 0, sizeof(header));
310
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
311
        {
312
            ICECAST_LOG_ERROR("Header read failed for %s (%s:%d%s)", relay->config->localmount, server, port, mount);
313
314
315
316
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
317
        if (! httpp_parse_response (parser, header, strlen(header), relay->config->localmount))
318
        {
319
            ICECAST_LOG_ERROR("Error parsing relay request for %s (%s:%d%s)", relay->config->localmount,
320
                    server, port, mount);
321
322
            break;
        }
323
        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
324
        {
325
326
327
            /* better retry the connection again but with different details */
            const char *uri, *mountpoint;
            int len;
328

329
            uri = httpp_getvar (parser, "location");
330
            ICECAST_LOG_INFO("redirect received %s", uri);
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
            if (strncmp (uri, "http://", 7) != 0)
                break;
            uri += 7;
            mountpoint = strchr (uri, '/');
            free (mount);
            if (mountpoint)
                mount = strdup (mountpoint);
            else
                mount = strdup ("/");

            len = strcspn (uri, ":/");
            port = 80;
            if (uri [len] == ':')
                port = atoi (uri+len+1);
            free (server);
            server = calloc (1, len+1);
            strncpy (server, uri, len);
            connection_close (con);
            httpp_destroy (parser);
350
351
352
            con = NULL;
            parser = NULL;
        }
353
354
355
356
357
358
        else
        {
            client_t *client = NULL;

            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
            {
359
                ICECAST_LOG_ERROR("Error from relay request: %s (%s)", relay->config->localmount,
360
361
362
363
364
365
366
367
368
369
370
371
372
373
                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                break;
            }
            global_lock ();
            if (client_create (&client, con, parser) < 0)
            {
                global_unlock ();
                /* make sure only the client_destory frees these */
                con = NULL;
                parser = NULL;
                client_destroy (client);
                break;
            }
            global_unlock ();
374
            sock_set_blocking (streamsock, 0);
375
            client_set_queue (client, NULL);
376
            client_complete(client);
377
378
            free (server);
            free (mount);
379
            free (server_id);
380
381
382
383
384
385
386
387
388
            free (auth_header);

            return client;
        }
        redirects++;
    }
    /* failed, better clean up */
    free (server);
    free (mount);
389
    free (server_id);
390
391
392
393
394
395
396
397
398
399
400
401
402
403
    free (auth_header);
    if (con)
        connection_close (con);
    if (parser)
        httpp_destroy (parser);
    return NULL;
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
static void *start_relay_stream (void *arg)
{
404
    relay_t *relay = arg;
405
406
407
    source_t *src = relay->source;
    client_t *client;

408
    ICECAST_LOG_INFO("Starting relayed source at mountpoint \"%s\"", relay->config->localmount);
409
410
    do
    {
411
412
413
414
415
416
417
418
419
420
421
422
423
424
        size_t i;

        for (i = 0; i < relay->config->upstreams; i++) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\", trying upstream #%zu", relay->config->localmount, i);
            client = open_relay_connection(relay, &(relay->config->upstream[i]));
            if (client)
                break;
        }

        /* if we have no upstreams defined, use the default upstream */
        if (!relay->config->upstreams) {
            ICECAST_LOG_DEBUG("For relay on mount \"%s\" with no upstreams trying upstream default", relay->config->localmount);
            client = open_relay_connection(relay, NULL);
        }
425
426
427
428
429
430
431

        if (client == NULL)
            continue;

        src->client = client;
        src->parser = client->parser;
        src->con = client->con;
432
433

        if (connection_complete_source (src, 0) < 0)
434
        {
435
            ICECAST_LOG_INFO("Failed to complete source initialisation");
436
437
438
            client_destroy (client);
            src->client = NULL;
            continue;
439
        }
440
        stats_event_inc(NULL, "source_relay_connections");
441
        stats_event (relay->config->localmount, "source_ip", client->con->ip);
442

443
444
        source_main (relay->source);

445
        if (relay->config->on_demand == 0)
446
447
        {
            /* only keep refreshing YP entries for inactive on-demand relays */
448
            yp_remove (relay->config->localmount);
449
            relay->source->yp_public = -1;
450
            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
451
            slave_update_all_mounts();
452
453
        }

454
        /* we've finished, now get cleaned up */
455
        relay->cleanup = 1;
456
        slave_rebuild_mounts();
457
458

        return NULL;
459
    } while (0); /* TODO allow looping through multiple servers */
460

461
462
463
464
    if (relay->source->fallback_mount)
    {
        source_t *fallback_source;

465
        ICECAST_LOG_DEBUG("failed relay, fallback to %s", relay->source->fallback_mount);
466
        avl_tree_rlock(global.source_tree);
467
        fallback_source = source_find_mount(relay->source->fallback_mount);
468
469

        if (fallback_source != NULL)
470
            source_move_clients(relay->source, fallback_source);
471

472
        avl_tree_unlock(global.source_tree);
473
474
    }

475
    source_clear_source(relay->source);
476

477
    /* cleanup relay, but prevent this relay from starting up again too soon */
478
479
    thread_mutex_lock(&_slave_mutex);
    thread_mutex_lock(&(config_locks()->relay_lock));
Karl Heyes's avatar
Karl Heyes committed
480
    relay->source->on_demand = 0;
481
    relay->start = time(NULL) + max_interval;
482
    relay->cleanup = 1;
483
484
    thread_mutex_unlock(&(config_locks()->relay_lock));
    thread_mutex_unlock(&_slave_mutex);
485
486

    return NULL;
487
488
489
490
}


/* wrapper for starting the provided relay stream */
491
static void check_relay_stream (relay_t *relay)
492
493
494
{
    if (relay->source == NULL)
    {
495
        if (relay->config->localmount[0] != '/')
496
        {
497
            ICECAST_LOG_WARN("relay mountpoint \"%s\" does not start with /, skipping",
498
                    relay->config->localmount);
499
500
            return;
        }
501
        /* new relay, reserve the name */
502
        relay->source = source_reserve (relay->config->localmount);
503
        if (relay->source)
504
        {
505
506
            ICECAST_LOG_DEBUG("Adding relay source at mountpoint \"%s\"", relay->config->localmount);
            if (relay->config->on_demand)
Karl Heyes's avatar
Karl Heyes committed
507
508
            {
                ice_config_t *config = config_get_config ();
509
510
                mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
                relay->source->on_demand = relay->config->on_demand;
Karl Heyes's avatar
Karl Heyes committed
511
512
513
                if (mountinfo == NULL)
                    source_update_settings (config, relay->source, mountinfo);
                config_release_config ();
514
                stats_event (relay->config->localmount, "listeners", "0");
515
                slave_update_all_mounts();
Karl Heyes's avatar
Karl Heyes committed
516
            }
517
        }
518
        else
519
520
521
        {
            if (relay->start == 0)
            {
522
                ICECAST_LOG_WARN("new relay but source \"%s\" already exists", relay->config->localmount);
523
524
525
526
                relay->start = 1;
            }
            return;
        }
527
    }
528
    do
529
    {
530
        source_t *source = relay->source;
531
532
        /* skip relay if active, not configured or just not time yet */
        if (relay->source == NULL || relay->running || relay->start > time(NULL))
533
            break;
534
        /* check if an inactive on-demand relay has a fallback that has listeners */
535
        if (relay->config->on_demand && source->on_demand_req == 0)
536
        {
537
            relay->source->on_demand = relay->config->on_demand;
538
539
540
541
542
543
544
545

            if (source->fallback_mount && source->fallback_override)
            {
                source_t *fallback;
                avl_tree_rlock (global.source_tree);
                fallback = source_find_mount (source->fallback_mount);
                if (fallback && fallback->running && fallback->listeners)
                {
546
                   ICECAST_LOG_DEBUG("fallback running %d with %lu listeners", fallback->running, fallback->listeners);
547
548
549
550
551
552
553
554
                   source->on_demand_req = 1;
                }
                avl_tree_unlock (global.source_tree);
            }
            if (source->on_demand_req == 0)
                break;
        }

555
        relay->start = time(NULL) + 5;
556
        relay->running = 1;
557
558
559
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
560

561
    } while (0);
562
    /* the relay thread may of shut down itself */
563
    if (relay->cleanup)
564
    {
565
566
        if (relay->thread)
        {
567
            ICECAST_LOG_DEBUG("waiting for relay thread for \"%s\"", relay->config->localmount);
568
569
570
            thread_join (relay->thread);
            relay->thread = NULL;
        }
571
572
        relay->cleanup = 0;
        relay->running = 0;
573

574
        if (relay->config->on_demand && relay->source)
575
576
        {
            ice_config_t *config = config_get_config ();
577
            mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
578
579
            source_update_settings (config, relay->source, mountinfo);
            config_release_config ();
580
            stats_event (relay->config->localmount, "listeners", "0");
581
        }
Michael Smith's avatar
Michael Smith committed
582
    }
583
584
}

Michael Smith's avatar
Michael Smith committed
585

586
587
588
/* compare the 2 relays to see if there are any changes, return 1 if
 * the relay needs to be restarted, 0 otherwise
 */
589
590
591
#define _EQ_STR(a,b) (((a) == (b)) || ((a) != NULL && (b) != NULL && strcmp((a), (b)) == 0))
#define _EQ_ATTR(x) (_EQ_STR((new->x), (old->x)))
static int relay_has_changed_upstream(const relay_config_upstream_t *new, const relay_config_upstream_t *old)
592
{
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
    if (new->mp3metadata != old->mp3metadata)
        return 1;

    if (!_EQ_ATTR(server) || new->port != old->port)
        return 1;

    if (!_EQ_ATTR(mount))
        return 1;

/* NOTE: We currently do not consider this a relevant change. Why?
    if (!_EQ_ATTR(username) || !_EQ_ATTR(password))
        return 1;

    if (!_EQ_ATTR(bind))
        return 1;
*/

    return 0;
}

static int relay_has_changed (const relay_config_t *new, relay_config_t *old)
{
    size_t i;

    /* This is not fully true: If more upstreams has been added there is no reason
     * to restart the relay. However for now we ignore this case. TODO: Change this.
     */
    if (new->upstreams != old->upstreams)
        return 1;

    for (i = 0; i < new->upstreams; i++) {
        if (relay_has_changed_upstream(&(new->upstream[i]), &(old->upstream[i])))
            return 1;
    }

    if (relay_has_changed_upstream(&(new->upstream_default), &(old->upstream_default)))
        return 1;

    /* Why do we do this here? */
    old->on_demand = new->on_demand;

    return 0;
635
636
637
}


638
639
640
641
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
642
643
static relay_t *
update_relay_set(relay_t **current, relay_config_t **updated, size_t updated_length)
644
{
645
646
647
648
649
650
651
    relay_config_t *relay;
    relay_t *existing_relay, **existing_p;
    relay_t *new_list = NULL;
    size_t i;

    for (i = 0; i < updated_length; i++) {
        relay = updated[i];
652

653
654
655
656
657
658
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
659
660
            if (strcmp(relay->localmount, existing_relay->config->localmount) == 0)
                if (relay_has_changed(relay, existing_relay->config) == 0)
661
                    break;
662
            existing_p = &existing_relay->next;
663

664
665
            existing_relay = existing_relay->next;
        }
666
667


668
669
670
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
671
            existing_relay = relay_new(relay);
672
673
674
675
676
677
678
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
679
    }
680

681
682
683
684
685
686
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
687
 * are separated and returned in a separate list
688
 */
689
690
static relay_t *
update_relays (relay_t **relay_list, relay_config_t **new_relay_list, size_t new_relay_list_length)
691
{
692
    relay_t *active_relays, *cleanup_relays;
693

694
    active_relays = update_relay_set(relay_list, new_relay_list, new_relay_list_length);
695

696
697
698
699
700
701
702
703
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


704
705
static void relay_check_streams (relay_t *to_start,
        relay_t *to_free, int skip_timer)
706
{
707
    relay_t *relay;
708
709

    while (to_free)
710
    {
711
        if (to_free->source)
712
        {
713
714
715
            if (to_free->running)
            {
                /* relay has been removed from xml, shut down active relay */
716
                ICECAST_LOG_DEBUG("source shutdown request on \"%s\"", to_free->config->localmount);
717
                to_free->running = 0;
718
719
720
721
                to_free->source->running = 0;
                thread_join (to_free->thread);
            }
            else
722
                stats_event (to_free->config->localmount, NULL, NULL);
723
        }
724
725
726
727
728
729
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
730
731
        if (skip_timer)
            relay->start = 0;
732
733
        check_relay_stream (relay);
        relay = relay->next;
734
    }
Michael Smith's avatar
Michael Smith committed
735
736
}

737
738
739
740
741

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
742
    sock_t mastersock;
743
    int ret = 0;
744
    char buf[256];
745
746
747
    do
    {
        char *authheader, *data;
748
749
750
        relay_t *cleanup_relays;
        relay_config_t **new_relays = NULL;
        size_t new_relays_length = 0;
751
        int len, count = 1;
752
        int on_demand;
753
        size_t i;
754

755
        username = strdup(config->master_username);
756
        if (config->master_password)
757
            password = strdup(config->master_password);
Michael Smith's avatar
Michael Smith committed
758

759
        if (config->master_server)
760
            master = strdup(config->master_server);
Michael Smith's avatar
Michael Smith committed
761

762
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
763

764
765
        if (password == NULL || master == NULL || port == 0)
            break;
766
        on_demand = config->on_demand;
767
768
        ret = 1;
        config_release_config();
769
        mastersock = sock_connect_wto(master, port, 10);
770

771
772
        if (mastersock == SOCK_ERROR)
        {
773
            ICECAST_LOG_WARN("Relay slave failed to contact master server to fetch stream list");
774
775
            break;
        }
Michael Smith's avatar
Michael Smith committed
776

777
778
779
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
780
        data = util_base64_encode(authheader, len);
781
782
783
784
785
786
787
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

788
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
789
                ((strncmp (buf, "HTTP/1.0 200", 12) != 0) && (strncmp (buf, "HTTP/1.1 200", 12) != 0)))
790
791
        {
            sock_close (mastersock);
792
            ICECAST_LOG_WARN("Master rejected streamlist request");
793
            break;
794
795
        } else {
            ICECAST_LOG_INFO("Master accepted streamlist request");
796
797
        }

798
799
800
801
802
803
804
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
805
806
807
            relay_config_t *c = NULL;
            relay_config_t **n;

808
809
            if (!strlen(buf))
                continue;
810
            ICECAST_LOG_DEBUG("read %d from master \"%s\"", count++, buf);
811
812
            xmlURIPtr parsed_uri = xmlParseURI(buf);
            if (parsed_uri == NULL) {
813
                ICECAST_LOG_DEBUG("Error while parsing line from master. Ignoring line.");
814
815
                continue;
            }
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835

            n = realloc(new_relays, sizeof(*new_relays)*(new_relays_length + 1));
            if (n) {
                new_relays = n;

                c = calloc(1, sizeof(*c));
                new_relays[new_relays_length++] = c;
            }

            if (c) {
                if (parsed_uri->server != NULL) {
                    c->upstream_default.server = strdup(parsed_uri->server);
                    if (parsed_uri->port == 0) {
                        c->upstream_default.port = 80;
                    } else {
                        c->upstream_default.port = parsed_uri->port;
                    }
                } else {
                    c->upstream_default.server = (char *)xmlCharStrdup (master);
                    c->upstream_default.port = port;
836
837
                }

838
839
840
841
842
                c->upstream_default.mount = strdup(parsed_uri->path);
                c->localmount = strdup(parsed_uri->path);
                c->upstream_default.mp3metadata = 1;
                c->on_demand = on_demand;
                ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", c->upstream_default.server, c->upstream_default.port, c->upstream_default.mount);
843
            }
844
            xmlFreeURI(parsed_uri);
Michael Smith's avatar
Michael Smith committed
845
        }
846
847
        sock_close (mastersock);

848
        thread_mutex_lock (&(config_locks()->relay_lock));
849
        cleanup_relays = update_relays (&global.master_relays, new_relays, new_relays_length);
850

851
        relay_check_streams (global.master_relays, cleanup_relays, 0);
852
853
854
855
856

        for (i = 0; i < new_relays_length; i++) {
            relay_config_free(new_relays[i]);
        }
        free(new_relays);
857
858
859

        thread_mutex_unlock (&(config_locks()->relay_lock));

860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
876
    unsigned int interval = 0;
Michael Smith's avatar
Michael Smith committed
877

878
879
    (void)arg;

880
    thread_mutex_lock(&_slave_mutex);
881
    update_settings = 0;
882
    update_all_mounts = 0;
883
    thread_mutex_unlock(&_slave_mutex);
884

Karl Heyes's avatar
Karl Heyes committed
885
    config = config_get_config();
886
    stats_global(config);
Karl Heyes's avatar
Karl Heyes committed
887
    config_release_config();
888
    source_recheck_mounts(1);
889

890
    while (1)
891
    {
892
        relay_t *cleanup_relays = NULL;
893
        int skip_timer = 0;
894

895
        /* re-read xml file if requested */
896
        global_lock();
897
898
        if (global.schedule_config_reread) {
            config_reread_config();
899
            global.schedule_config_reread = 0;
900
        }
901
        global_unlock();
902

903
        thread_sleep(1000000);
904
905
906
        thread_mutex_lock(&_slave_mutex);
        if (slave_running == 0) {
            thread_mutex_unlock(&_slave_mutex);
907
            break;
908
909
        }
        thread_mutex_unlock(&_slave_mutex);
910
911

        ++interval;
912

913
        /* only update relays lists when required */
914
        thread_mutex_lock(&_slave_mutex);
915
916
        if (max_interval <= interval)
        {
917
            ICECAST_LOG_DEBUG("checking master stream list");
918
            config = config_get_config();
Michael Smith's avatar
Michael Smith committed
919

920
921
            if (max_interval == 0)
                skip_timer = 1;
922
923
            interval = 0;
            max_interval = config->master_update_interval;
924
            thread_mutex_unlock(&_slave_mutex);
Michael Smith's avatar
Michael Smith committed
925

926
927
928
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
929

930
            thread_mutex_lock (&(config_locks()->relay_lock));
931

932
            cleanup_relays = update_relays(&global.relays, config->relay, config->relay_length);
933

934
935
936
            config_release_config();
        }
        else
937
938
        {
            thread_mutex_unlock(&_slave_mutex);
939
            thread_mutex_lock (&(config_locks()->relay_lock));
940
        }
941
942
943
944
945

        relay_check_streams (global.relays, cleanup_relays, skip_timer);
        relay_check_streams (global.master_relays, NULL, skip_timer);
        thread_mutex_unlock (&(config_locks()->relay_lock));

946
        thread_mutex_lock(&_slave_mutex);
947
948
        if (update_settings)
        {
949
            source_recheck_mounts (update_all_mounts);
950
            update_settings = 0;
951
            update_all_mounts = 0;
952
        }
953
        thread_mutex_unlock(&_slave_mutex);
954
    }
955
    ICECAST_LOG_INFO("shutting down current relays");
956
957
    relay_check_streams (NULL, global.relays, 0);
    relay_check_streams (NULL, global.master_relays, 0);
958

959
    ICECAST_LOG_INFO("Slave thread shutdown complete");
960

961
    return NULL;
962
}
Michael Smith's avatar
Michael Smith committed
963