slave.c 14.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14
15
16
17
18
19
20
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

21
22
23
24
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

#include "os.h"

Karl Heyes's avatar
Karl Heyes committed
44
45
46
47
#include "thread/thread.h"
#include "avl/avl.h"
#include "net/sock.h"
#include "httpp/httpp.h"
48

49
#include "cfgfile.h"
50
51
52
53
54
55
56
57
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#include "source.h"
Michael Smith's avatar
Michael Smith committed
58
#include "format.h"
59
60
61
62

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
63
static thread_type *_slave_thread_id;
64
static int slave_running = 0;
65
66
volatile static unsigned int max_interval = 0;
volatile static int rescan_relays = 0;
67

68
relay_server *relay_free (relay_server *relay)
69
{
70
71
72
73
74
75
76
    relay_server *next = relay->next;
    DEBUG1("freeing relay %s", relay->localmount);
    if (relay->source)
       source_free_source (relay->source);
    xmlFree (relay->server);
    xmlFree (relay->mount);
    xmlFree (relay->localmount);
77
78
79
80
    if (relay->username)
        xmlFree (relay->username);
    if (relay->password)
        xmlFree (relay->password);
81
    free (relay);
82
    return next;
83
84
}

85

86
87
88
relay_server *relay_copy (relay_server *r)
{
    relay_server *copy = calloc (1, sizeof (relay_server));
Michael Smith's avatar
Michael Smith committed
89

90
    if (copy)
Michael Smith's avatar
Michael Smith committed
91
    {
92
93
94
        copy->server = xmlStrdup (r->server);
        copy->mount = xmlStrdup (r->mount);
        copy->localmount = xmlStrdup (r->localmount);
95
96
97
98
        if (r->username)
            copy->username = xmlStrdup (r->username);
        if (r->password)
            copy->password = xmlStrdup (r->password);
99
100
        copy->port = r->port;
        copy->mp3metadata = r->mp3metadata;
Michael Smith's avatar
Michael Smith committed
101
    }
102
103
    return copy;
}
104

105

106
107
108
109
/* force a recheck of the relays. This will recheck the master server if
 * a this is a slave.
 */
void slave_recheck (void)
110
{
111
    max_interval = 0;
112
113
}

114
115
116
117
/* rescan the current relays to see if any need starting or if any
 * relay threads have terminated
 */
void slave_rescan (void)
118
{
119
    rescan_relays = 1;
120
121
}

122
123

void slave_initialize(void)
Michael Smith's avatar
Michael Smith committed
124
{
125
126
    if (slave_running)
        return;
Michael Smith's avatar
Michael Smith committed
127

128
    slave_running = 1;
129
    max_interval = 0;
130
131
    _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
132

Michael Smith's avatar
Michael Smith committed
133

134
135
136
void slave_shutdown(void)
{
    if (!slave_running)
137
        return;
138
    slave_running = 0;
139
    DEBUG0 ("waiting for slave thread");
140
141
142
143
144
145
146
    thread_join (_slave_thread_id);
}


/* This does the actual connection for a relay. A thread is
 * started off if a connection can be acquired
 */
147
static void *start_relay_stream (void *arg)
148
{
149
    relay_server *relay = arg;
150
151
152
153
154
155
    sock_t streamsock = SOCK_ERROR;
    source_t *src = relay->source;
    http_parser_t *parser = NULL;
    connection_t *con=NULL;
    char header[4096];

156
    relay->running = 1;
157
158
159
    INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
    do
    {
160
161
        char *auth_header;

162
163
164
165
166
167
        streamsock = sock_connect_wto (relay->server, relay->port, 30);
        if (streamsock == SOCK_ERROR)
        {
            WARN3("Failed to relay stream from master server, couldn't connect to http://%s:%d%s",
                    relay->server, relay->port, relay->mount);
            break;
Michael Smith's avatar
Michael Smith committed
168
        }
169
170
        con = create_connection (streamsock, -1, NULL);

171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
        if (relay->username && relay->password)
        {
            char *esc_authorisation;
            unsigned len = strlen(relay->username) + strlen(relay->password) + 2;

            auth_header = malloc (len);
            snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
            esc_authorisation = util_base64_encode(auth_header);
            free(auth_header);
            len = strlen (esc_authorisation) + 24;
            auth_header = malloc (len);
            snprintf (auth_header, len,
                    "Authorization: Basic %s\r\n", esc_authorisation);
            free(esc_authorisation);
        }
        else
        {
            auth_header = strdup ("");
        }

191
192
193
194
195
196
197
198
        /* At this point we may not know if we are relaying an mp3 or vorbis
         * stream, but only send the icy-metadata header if the relay details
         * state so (the typical case).  It's harmless in the vorbis case. If
         * we don't send in this header then relay will not have mp3 metadata.
         */
        sock_write(streamsock, "GET %s HTTP/1.0\r\n"
                "User-Agent: " ICECAST_VERSION_STRING "\r\n"
                "%s"
199
                "%s"
200
                "\r\n",
201
202
203
204
                relay->mount,
                relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
                auth_header);
        free (auth_header);
205
        memset (header, 0, sizeof(header));
206
        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
207
208
209
210
211
212
213
214
        {
            WARN0("Header read failed");
            break;
        }
        parser = httpp_create_parser();
        httpp_initialize (parser, NULL);
        if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
        {
Michael Smith's avatar
Michael Smith committed
215
            ERROR0("Error parsing relay request");
216
217
218
219
220
221
222
223
224
225
226
227
228
229
            break;
        }
        if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
        {
            ERROR1("Error from relay request: %s", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
            break;
        }
        src->parser = parser;
        src->con = con;
        if (connection_complete_source (src) < 0)
        {
            DEBUG0("Failed to complete source initialisation");
            break;
        }
230
        stats_event_inc(NULL, "source_relay_connections");
231

232
233
234
235
236
237
238
        source_main (relay->source);

        /* initiate an immediate relay cleanup run */
        relay->cleanup = 1;
        slave_rescan();

        return NULL;
239
240
241
242
243
244
245
246
247
248
    } while (0);

    if (con == NULL && streamsock != SOCK_ERROR)
        sock_close (streamsock);
    if (con)
        connection_close (con);
    src->con = NULL;
    if (parser)
        httpp_destroy (parser);
    src->parser = NULL;
249
    source_clear_source (relay->source);
250
251
252
253
254
255

    /* initiate an immediate relay cleanup run */
    relay->cleanup = 1;
    slave_rescan();

    return NULL;
256
257
258
259
260
261
262
263
}


/* wrapper for starting the provided relay stream */
static void check_relay_stream (relay_server *relay)
{
    if (relay->source == NULL)
    {
264
265
266
267
268
269
        if (relay->localmount[0] != '/')
        {
            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
                    relay->localmount);
            return;
        }
270
271
        /* new relay, reserve the name */
        relay->source = source_reserve (relay->localmount);
272
273
274
275
        if (relay->source)
            DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
        else
            WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
276
277
278
    }
    if (relay->source && !relay->running)
    {
279
280
281
282
283
284
285
286
287
288
289
290
        relay->thread = thread_create ("Relay Thread", start_relay_stream,
                relay, THREAD_ATTACHED);
        return;
    }
    /* the relay thread may of close down */
    if (relay->cleanup && relay->thread)
    {
        DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
        thread_join (relay->thread);
        relay->thread = NULL;
        relay->cleanup = 0;
        relay->running = 0;
Michael Smith's avatar
Michael Smith committed
291
    }
292
293
}

Michael Smith's avatar
Michael Smith committed
294

295
296
297
298
299
300
301
302
303
304
305
306
307
/* go through updated looking for relays that are different configured. The
 * returned list contains relays that should be kept running, current contains
 * the list of relays to shutdown
 */
static relay_server *
update_relay_set (relay_server **current, relay_server *updated)
{
    relay_server *relay = updated;
    relay_server *existing_relay, **existing_p;
    relay_server *new_list = NULL;

    while (relay)
    {
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
        existing_relay = *current;
        existing_p = current;

        while (existing_relay)
        {
            /* break out if keeping relay */
            if (strcmp (relay->localmount, existing_relay->localmount) == 0)
                break;
            existing_p = &existing_relay->next;
            existing_relay = existing_relay->next;
        }
        if (existing_relay == NULL)
        {
            /* new one, copy and insert */
            existing_relay = relay_copy (relay);
        }
        else
        {
            *existing_p = existing_relay->next;
        }
        existing_relay->next = new_list;
        new_list = existing_relay;
        relay = relay->next;
331
    }
332
333
334
335
336
337
    return new_list;
}


/* update the relay_list with entries from new_relay_list. Any new relays
 * are added to the list, and any not listed in the provided new_relay_list
338
 * are separated and returned in a separate list
339
 */
340
static relay_server *
341
342
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
343
    relay_server *active_relays, *cleanup_relays;
344

345
    active_relays = update_relay_set (relay_list, new_relay_list);
346

347
348
349
350
351
352
353
354
355
356
357
358
359
    cleanup_relays = *relay_list;
    /* re-assign new set */
    *relay_list = active_relays;

    return cleanup_relays;
}


static void relay_check_streams (relay_server *to_start, relay_server *to_free)
{
    relay_server *relay;

    while (to_free)
360
    {
361
        if (to_free->running && to_free->source)
362
        {
363
364
365
            DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
            to_free->source->running = 0;
            thread_join (to_free->thread);
366
        }
367
368
369
370
371
372
373
374
        to_free = relay_free (to_free);
    }

    relay = to_start;
    while (relay)
    {
        check_relay_stream (relay);
        relay = relay->next;
375
    }
Michael Smith's avatar
Michael Smith committed
376
377
}

378
379
380
381
382

static int update_from_master(ice_config_t *config)
{
    char *master = NULL, *password = NULL, *username= NULL;
    int port;
383
    sock_t mastersock;
384
    int ret = 0;
385
    char buf[256];
386
387
388
    do
    {
        char *authheader, *data;
389
        relay_server *new_relays = NULL, *cleanup_relays;
390
        int len, count = 1;
391

392
393
394
        username = strdup ("relay");
        if (config->master_password)
            password = strdup (config->master_password);
Michael Smith's avatar
Michael Smith committed
395

396
397
        if (config->master_server)
            master = strdup (config->master_server);
Michael Smith's avatar
Michael Smith committed
398

399
        port = config->master_server_port;
Michael Smith's avatar
Michael Smith committed
400

401
402
403
404
405
        if (password == NULL || master == NULL || port == 0)
            break;
        ret = 1;
        config_release_config();
        mastersock = sock_connect_wto (master, port, 0);
406

407
408
409
410
411
        if (mastersock == SOCK_ERROR)
        {
            WARN0("Relay slave failed to contact master server to fetch stream list");
            break;
        }
Michael Smith's avatar
Michael Smith committed
412

413
414
415
        len = strlen(username) + strlen(password) + 2;
        authheader = malloc(len);
        snprintf (authheader, len, "%s:%s", username, password);
416
417
418
419
420
421
422
423
        data = util_base64_encode(authheader);
        sock_write (mastersock,
                "GET /admin/streamlist.txt HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
        free(authheader);
        free(data);

424
425
426
427
428
429
430
431
        if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 ||
                strncmp (buf, "HTTP/1.0 200", 12) != 0)
        {
            sock_close (mastersock);
            WARN0 ("Master rejected streamlist request");
            break;
        }

432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            if (!strlen(buf))
                break;
        }
        while (sock_read_line(mastersock, buf, sizeof(buf)))
        {
            relay_server *r;
            if (!strlen(buf))
                continue;
            DEBUG2 ("read %d from master \"%s\"", count++, buf);
            r = calloc (1, sizeof (relay_server));
            if (r)
            {
                r->server = xmlStrdup (master);
                r->port = port;
                r->mount = xmlStrdup (buf);
                r->localmount = xmlStrdup (buf);
                r->mp3metadata = 1;
451
452
                r->next = new_relays;
                new_relays = r;
453
            }
Michael Smith's avatar
Michael Smith committed
454
        }
455
456
        sock_close (mastersock);

457
458
459
460
461
462
463
464
        thread_mutex_lock (&(config_locks()->relay_lock));
        cleanup_relays = update_relays (&global.master_relays, new_relays);
        
        relay_check_streams (global.master_relays, cleanup_relays);
        relay_check_streams (NULL, new_relays);

        thread_mutex_unlock (&(config_locks()->relay_lock));

465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
    } while(0);

    if (master)
        free (master);
    if (username)
        free (username);
    if (password)
        free (password);

    return ret;
}


static void *_slave_thread(void *arg)
{
    ice_config_t *config;
481
    unsigned int interval = 0;
Michael Smith's avatar
Michael Smith committed
482

483
    while (1)
484
    {
485
486
        relay_server *cleanup_relays;

487
        thread_sleep (1000000);
488
489
        if (slave_running == 0)
            break;
490
        if (rescan_relays == 0 && max_interval > ++interval)
491
492
            continue;

493
494
495
496
497
        /* only update relays lists when required */
        if (max_interval <= interval)
        {
            DEBUG0 ("checking master stream list");
            config = config_get_config();
Michael Smith's avatar
Michael Smith committed
498

499
500
            interval = 0;
            max_interval = config->master_update_interval;
Michael Smith's avatar
Michael Smith committed
501

502
503
504
            /* the connection could take some time, so the lock can drop */
            if (update_from_master (config))
                config = config_get_config();
505

506
            thread_mutex_lock (&(config_locks()->relay_lock));
507

508
            cleanup_relays = update_relays (&global.relays, config->relay);
509

510
            config_release_config();
511

512
513
514
515
            relay_check_streams (global.relays, cleanup_relays);
            thread_mutex_unlock (&(config_locks()->relay_lock));
        }
        else
516
        {
517
518
519
520
521
            DEBUG0 ("rescanning relay lists");
            thread_mutex_lock (&(config_locks()->relay_lock));
            relay_check_streams (global.master_relays, NULL);
            relay_check_streams (global.relays, NULL);
            thread_mutex_unlock (&(config_locks()->relay_lock));
Michael Smith's avatar
Michael Smith committed
522
        }
523
        rescan_relays = 0;
524
    }
525
526
527
528
    DEBUG0 ("shutting down current relays");
    relay_check_streams (NULL, global.relays);
    relay_check_streams (NULL, global.master_relays);

529
530
    INFO0 ("Slave thread shutdown complete");

531
    return NULL;
532
}
Michael Smith's avatar
Michael Smith committed
533