connection.c 48.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
Philipp Schafft's avatar
Philipp Schafft committed
11
12
 * Copyright 2011,      Dave 'justdave' Miller <justdave@mozilla.com>,
 * Copyright 2011-2014, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
13
14
 */

15
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
16
17
18
19
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
20
21
#include <stdio.h>
#include <stdlib.h>
22
#include <errno.h>
Jack Moffitt's avatar
Jack Moffitt committed
23
#include <string.h>
24
25
26
#ifdef HAVE_POLL
#include <sys/poll.h>
#endif
27
28
#include <sys/types.h>
#include <sys/stat.h>
29
30

#ifndef _WIN32
Jack Moffitt's avatar
Jack Moffitt committed
31
32
#include <sys/socket.h>
#include <netinet/in.h>
33
#else
34
#include <winsock2.h>
35
36
#define snprintf _snprintf
#define strcasecmp stricmp
37
#define strncasecmp strnicmp
38
#endif
Jack Moffitt's avatar
Jack Moffitt committed
39

40
#include "compat.h"
Jack Moffitt's avatar
Jack Moffitt committed
41

Karl Heyes's avatar
Karl Heyes committed
42
43
44
45
#include "thread/thread.h"
#include "avl/avl.h"
#include "net/sock.h"
#include "httpp/httpp.h"
Jack Moffitt's avatar
Jack Moffitt committed
46

47
#include "cfgfile.h"
Jack Moffitt's avatar
Jack Moffitt committed
48
49
50
51
52
53
54
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
55
#include "xslt.h"
56
#include "fserve.h"
57
#include "sighandler.h"
58
59

#include "yp.h"
Jack Moffitt's avatar
Jack Moffitt committed
60
#include "source.h"
Michael Smith's avatar
Michael Smith committed
61
#include "format.h"
62
#include "format_mp3.h"
Michael Smith's avatar
Michael Smith committed
63
#include "event.h"
64
#include "admin.h"
Michael Smith's avatar
Michael Smith committed
65
#include "auth.h"
Jack Moffitt's avatar
Jack Moffitt committed
66
67
68

#define CATMODULE "connection"

69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/* Two different major types of source authentication.
   Shoutcast style is used only by the Shoutcast DSP
   and is a crazy version of HTTP.  It looks like :
     Source Client -> Connects to port + 1
     Source Client -> sends encoder password (plaintext)\r\n
     Icecast -> reads encoder password, if ok, sends OK2\r\n, else disconnects
     Source Client -> reads OK2\r\n, then sends http-type request headers
                      that contain the stream details (icy-name, etc..)
     Icecast -> reads headers, stores them
     Source Client -> starts sending MP3 data
     Source Client -> periodically updates metadata via admin.cgi call

   Icecast auth style uses HTTP and Basic Authorization.
*/

84
85
86
87
88
typedef struct client_queue_tag {
    client_t *client;
    int offset;
    int stream_offset;
    int shoutcast;
89
    char *shoutcast_mount;
90
91
    struct client_queue_tag *next;
} client_queue_t;
Jack Moffitt's avatar
Jack Moffitt committed
92
93

typedef struct _thread_queue_tag {
94
95
    thread_type *thread_id;
    struct _thread_queue_tag *next;
Jack Moffitt's avatar
Jack Moffitt committed
96
97
} thread_queue_t;

98
99
100
101
102
103
104
105
typedef struct
{
    char *filename;
    time_t file_recheck;
    time_t file_mtime;
    avl_tree *contents;
} cache_file_contents;

106
static spin_t _connection_lock; // protects _current_id, _con_queue, _con_queue_tail
107
static volatile unsigned long _current_id = 0;
Jack Moffitt's avatar
Jack Moffitt committed
108
109
static int _initialized = 0;

110
111
static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
112
113
114
115
116
static int ssl_ok;
#ifdef HAVE_OPENSSL
static SSL_CTX *ssl_ctx;
#endif

117
/* filtering client connection based on IP */
118
static cache_file_contents banned_ip, allowed_ip;
119

120
rwlock_t _source_shutdown_rwlock;
Jack Moffitt's avatar
Jack Moffitt committed
121

122
static void _handle_connection(void);
Jack Moffitt's avatar
Jack Moffitt committed
123

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
static int compare_ip (void *arg, void *a, void *b)
{
    const char *ip = (const char *)a;
    const char *pattern = (const char *)b;

    return strcmp (pattern, ip);
}


static int free_filtered_ip (void*x)
{
    free (x);
    return 1;
}


Jack Moffitt's avatar
Jack Moffitt committed
140
141
void connection_initialize(void)
{
142
143
    if (_initialized) return;
    
144
    thread_spin_create (&_connection_lock);
145
    thread_mutex_create(&move_clients_mutex);
146
    thread_rwlock_create(&_source_shutdown_rwlock);
147
    thread_cond_create(&global.shutdown_cond);
148
149
150
151
    _req_queue = NULL;
    _req_queue_tail = &_req_queue;
    _con_queue = NULL;
    _con_queue_tail = &_con_queue;
Jack Moffitt's avatar
Jack Moffitt committed
152

153
154
155
156
157
158
    banned_ip.contents = NULL;
    banned_ip.file_mtime = 0;

    allowed_ip.contents = NULL;
    allowed_ip.file_mtime = 0;

159
    _initialized = 1;
Jack Moffitt's avatar
Jack Moffitt committed
160
161
162
163
}

void connection_shutdown(void)
{
164
165
    if (!_initialized) return;
    
166
167
168
#ifdef HAVE_OPENSSL
    SSL_CTX_free (ssl_ctx);
#endif
169
170
171
    if (banned_ip.contents)  avl_tree_free (banned_ip.contents, free_filtered_ip);
    if (allowed_ip.contents) avl_tree_free (allowed_ip.contents, free_filtered_ip);
 
172
    thread_cond_destroy(&global.shutdown_cond);
173
    thread_rwlock_destroy(&_source_shutdown_rwlock);
174
    thread_spin_destroy (&_connection_lock);
175
    thread_mutex_destroy(&move_clients_mutex);
Jack Moffitt's avatar
Jack Moffitt committed
176

177
    _initialized = 0;
Jack Moffitt's avatar
Jack Moffitt committed
178
179
180
181
}

static unsigned long _next_connection_id(void)
{
182
    unsigned long id;
Jack Moffitt's avatar
Jack Moffitt committed
183

184
    thread_spin_lock (&_connection_lock);
185
    id = _current_id++;
186
    thread_spin_unlock (&_connection_lock);
Jack Moffitt's avatar
Jack Moffitt committed
187

188
    return id;
Jack Moffitt's avatar
Jack Moffitt committed
189
190
}

191
192

#ifdef HAVE_OPENSSL
193
static void get_ssl_certificate (ice_config_t *config)
194
195
{
    SSL_METHOD *method;
196
    long ssl_opts;
197
198
199
200
201
202
203
    ssl_ok = 0;

    SSL_load_error_strings();                /* readable error messages */
    SSL_library_init();                      /* initialize library */

    method = SSLv23_server_method();
    ssl_ctx = SSL_CTX_new (method);
204
    ssl_opts = SSL_CTX_get_options (ssl_ctx);
205
206
207
208
209
#ifdef SSL_OP_NO_COMPRESSION
    SSL_CTX_set_options (ssl_ctx, ssl_opts|SSL_OP_NO_SSLv2|SSL_OP_NO_SSLv3|SSL_OP_NO_COMPRESSION);
#else
    SSL_CTX_set_options (ssl_ctx, ssl_opts|SSL_OP_NO_SSLv2|SSL_OP_NO_SSLv3);
#endif
210
211
212
213
214

    do
    {
        if (config->cert_file == NULL)
            break;
215
        if (SSL_CTX_use_certificate_chain_file (ssl_ctx, config->cert_file) <= 0)
216
        {
217
            ICECAST_LOG_WARN("Invalid cert file %s", config->cert_file);
218
219
220
221
            break;
        }
        if (SSL_CTX_use_PrivateKey_file (ssl_ctx, config->cert_file, SSL_FILETYPE_PEM) <= 0)
        {
222
            ICECAST_LOG_WARN("Invalid private key file %s", config->cert_file);
223
224
225
226
            break;
        }
        if (!SSL_CTX_check_private_key (ssl_ctx))
        {
227
            ICECAST_LOG_ERROR("Invalid %s - Private key does not match cert public key", config->cert_file);
228
229
            break;
        }
230
231
        if (SSL_CTX_set_cipher_list(ssl_ctx, config->cipher_list) <= 0) 
        { 
232
            ICECAST_LOG_WARN("Invalid cipher list: %s", config->cipher_list); 
233
        } 
234
        ssl_ok = 1;
235
236
        ICECAST_LOG_INFO("SSL certificate found at %s", config->cert_file);
        ICECAST_LOG_INFO("SSL using ciphers %s", config->cipher_list); 
237
        return;
238
    } while (0);
239
    ICECAST_LOG_INFO("No SSL capability on any configured ports");
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
}


/* handlers for reading and writing a connection_t when there is ssl
 * configured on the listening port
 */
static int connection_read_ssl (connection_t *con, void *buf, size_t len)
{
    int bytes = SSL_read (con->ssl, buf, len);

    if (bytes < 0)
    {
        switch (SSL_get_error (con->ssl, bytes))
        {
            case SSL_ERROR_WANT_READ:
            case SSL_ERROR_WANT_WRITE:
                return -1;
        }
        con->error = 1;
    }
    return bytes;
}

static int connection_send_ssl (connection_t *con, const void *buf, size_t len)
{
    int bytes = SSL_write (con->ssl, buf, len);

    if (bytes < 0)
    {
        switch (SSL_get_error (con->ssl, bytes))
        {
            case SSL_ERROR_WANT_READ:
            case SSL_ERROR_WANT_WRITE:
                return -1;
        }
        con->error = 1;
    }
    else
        con->sent_bytes += bytes;
    return bytes;
}
#else

/* SSL not compiled in, so at least log it */
284
static void get_ssl_certificate (ice_config_t *config)
285
286
{
    ssl_ok = 0;
287
    ICECAST_LOG_INFO("No SSL capability");
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
}
#endif /* HAVE_OPENSSL */


/* handlers (default) for reading and writing a connection_t, no encrpytion
 * used just straight access to the socket
 */
static int connection_read (connection_t *con, void *buf, size_t len)
{
    int bytes = sock_read_bytes (con->sock, buf, len);
    if (bytes == 0)
        con->error = 1;
    if (bytes == -1 && !sock_recoverable (sock_error()))
        con->error = 1;
    return bytes;
}

static int connection_send (connection_t *con, const void *buf, size_t len)
{
    int bytes = sock_write_bytes (con->sock, buf, len);
    if (bytes < 0)
    {
        if (!sock_recoverable (sock_error()))
            con->error = 1;
    }
    else
        con->sent_bytes += bytes;
    return bytes;
}


319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
/* function to handle the re-populating of the avl tree containing IP addresses
 * for deciding whether a connection of an incoming request is to be dropped.
 */
static void recheck_ip_file (cache_file_contents *cache)
{
    time_t now = time(NULL);
    if (now >= cache->file_recheck)
    {
        struct stat file_stat;
        FILE *file = NULL;
        int count = 0;
        avl_tree *new_ips;
        char line [MAX_LINE_LEN];

        cache->file_recheck = now + 10;
        if (cache->filename == NULL)
        {
            if (cache->contents)
            {
                avl_tree_free (cache->contents, free_filtered_ip);
                cache->contents = NULL;
            }
            return;
        }
        if (stat (cache->filename, &file_stat) < 0)
        {
345
            ICECAST_LOG_WARN("failed to check status of \"%s\": %s", cache->filename, strerror(errno));
346
347
348
349
350
351
352
353
354
355
            return;
        }
        if (file_stat.st_mtime == cache->file_mtime)
            return; /* common case, no update to file */

        cache->file_mtime = file_stat.st_mtime;

        file = fopen (cache->filename, "r");
        if (file == NULL)
        {
356
            ICECAST_LOG_WARN("Failed to open file \"%s\": %s", cache->filename, strerror (errno));
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
            return;
        }

        new_ips = avl_tree_new (compare_ip, NULL);

        while (get_line (file, line, MAX_LINE_LEN))
        {
            char *str;
            if(!line[0] || line[0] == '#')
                continue;
            count++;
            str = strdup (line);
            if (str)
                avl_insert (new_ips, str);
        }
        fclose (file);
373
        ICECAST_LOG_INFO("%d entries read from file \"%s\"", count, cache->filename);
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392

        if (cache->contents) avl_tree_free (cache->contents, free_filtered_ip);
        cache->contents = new_ips;
    }
}


/* return 0 if the passed ip address is not to be handled by icecast, non-zero otherwise */
static int accept_ip_address (char *ip)
{
    void *result;

    recheck_ip_file (&banned_ip);
    recheck_ip_file (&allowed_ip);

    if (banned_ip.contents)
    {
        if (avl_get_by_key (banned_ip.contents, ip, &result) == 0)
        {
393
            ICECAST_LOG_DEBUG("%s is banned", ip);
394
395
396
397
398
399
400
            return 0;
        }
    }
    if (allowed_ip.contents)
    {
        if (avl_get_by_key (allowed_ip.contents, ip, &result) == 0)
        {
401
            ICECAST_LOG_DEBUG("%s is allowed", ip);
402
403
404
405
            return 1;
        }
        else
        {
406
            ICECAST_LOG_DEBUG("%s is not allowed", ip);
407
408
409
410
411
412
413
            return 0;
        }
    }
    return 1;
}


414
415
connection_t *connection_create (sock_t sock, sock_t serversock, char *ip)
{
416
    connection_t *con;
417
418
419
420
421
422
423
424
    con = (connection_t *)calloc(1, sizeof(connection_t));
    if (con)
    {
        con->sock = sock;
        con->serversock = serversock;
        con->con_time = time(NULL);
        con->id = _next_connection_id();
        con->ip = ip;
425
426
        con->read = connection_read;
        con->send = connection_send;
427
    }
Michael Smith's avatar
Michael Smith committed
428

429
    return con;
430
431
}

432
433
434
435
436
437
438
439
440
441
442
443
444
/* prepare connection for interacting over a SSL connection
 */
void connection_uses_ssl (connection_t *con)
{
#ifdef HAVE_OPENSSL
    con->read = connection_read_ssl;
    con->send = connection_send_ssl;
    con->ssl = SSL_new (ssl_ctx);
    SSL_set_accept_state (con->ssl);
    SSL_set_fd (con->ssl, con->sock);
#endif
}

445
static sock_t wait_for_serversock(int timeout)
446
447
{
#ifdef HAVE_POLL
448
    struct pollfd ufds [global.server_sockets];
449
450
451
452
453
454
455
456
457
458
    int i, ret;

    for(i=0; i < global.server_sockets; i++) {
        ufds[i].fd = global.serversock[i];
        ufds[i].events = POLLIN;
        ufds[i].revents = 0;
    }

    ret = poll(ufds, global.server_sockets, timeout);
    if(ret < 0) {
459
        return SOCK_ERROR;
460
461
    }
    else if(ret == 0) {
462
        return SOCK_ERROR;
463
464
    }
    else {
465
        int dst;
466
        for(i=0; i < global.server_sockets; i++) {
467
            if(ufds[i].revents & POLLIN)
468
                return ufds[i].fd;
469
470
471
472
            if(ufds[i].revents & (POLLHUP|POLLERR|POLLNVAL))
            {
                if (ufds[i].revents & (POLLHUP|POLLERR))
                {
473
                    sock_close (global.serversock[i]);
474
                    ICECAST_LOG_WARN("Had to close a listening socket");
475
                }
476
                global.serversock[i] = SOCK_ERROR;
477
            }
478
        }
479
480
481
        /* remove any closed sockets */
        for(i=0, dst=0; i < global.server_sockets; i++)
        {
482
            if (global.serversock[i] == SOCK_ERROR)
483
484
485
486
487
488
                continue;
            if (i!=dst)
                global.serversock[dst] = global.serversock[i];
            dst++;
        }
        global.server_sockets = dst;
489
        return SOCK_ERROR;
490
491
492
493
494
    }
#else
    fd_set rfds;
    struct timeval tv, *p=NULL;
    int i, ret;
495
    sock_t max = SOCK_ERROR;
496
497
498
499
500

    FD_ZERO(&rfds);

    for(i=0; i < global.server_sockets; i++) {
        FD_SET(global.serversock[i], &rfds);
501
        if (max == SOCK_ERROR || global.serversock[i] > max)
502
503
504
505
506
            max = global.serversock[i];
    }

    if(timeout >= 0) {
        tv.tv_sec = timeout/1000;
507
        tv.tv_usec = (timeout % 1000) * 1000;
508
509
510
511
512
        p = &tv;
    }

    ret = select(max+1, &rfds, NULL, NULL, p);
    if(ret < 0) {
513
        return SOCK_ERROR;
514
515
    }
    else if(ret == 0) {
516
        return SOCK_ERROR;
517
518
519
520
521
522
    }
    else {
        for(i=0; i < global.server_sockets; i++) {
            if(FD_ISSET(global.serversock[i], &rfds))
                return global.serversock[i];
        }
523
        return SOCK_ERROR; /* Should be impossible, stop compiler warnings */
524
525
526
527
    }
#endif
}

528
static connection_t *_accept_connection(int duration)
Jack Moffitt's avatar
Jack Moffitt committed
529
{
530
    sock_t sock, serversock;
531
    char *ip;
Jack Moffitt's avatar
Jack Moffitt committed
532

533
    serversock = wait_for_serversock (duration);
534
    if (serversock == SOCK_ERROR)
535
        return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
536

537
538
    /* malloc enough room for a full IP address (including ipv6) */
    ip = (char *)malloc(MAX_ADDR_LEN);
Jack Moffitt's avatar
Jack Moffitt committed
539

540
    sock = sock_accept(serversock, ip, MAX_ADDR_LEN);
541
    if (sock != SOCK_ERROR)
542
    {
543
        connection_t *con = NULL;
544
545
546
        /* Make any IPv4 mapped IPv6 address look like a normal IPv4 address */
        if (strncmp (ip, "::ffff:", 7) == 0)
            memmove (ip, ip+7, strlen (ip+7)+1);
Jack Moffitt's avatar
Jack Moffitt committed
547

548
549
        if (accept_ip_address (ip))
            con = connection_create (sock, serversock, ip);
550
551
552
553
554
555
556
557
        if (con)
            return con;
        sock_close (sock);
    }
    else
    {
        if (!sock_recoverable(sock_error()))
        {
558
            ICECAST_LOG_WARN("accept() failed with error %d: %s", sock_error(), strerror(sock_error()));
559
560
            thread_sleep (500000);
        }
561
562
563
    }
    free(ip);
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
564
565
566
}


567
568
569
570
571
/* add client to connection queue. At this point some header information
 * has been collected, so we now pass it onto the connection thread for
 * further processing
 */
static void _add_connection (client_queue_t *node)
Jack Moffitt's avatar
Jack Moffitt committed
572
{
573
    thread_spin_lock (&_connection_lock);
574
575
    *_con_queue_tail = node;
    _con_queue_tail = (volatile client_queue_t **)&node->next;
576
    thread_spin_unlock (&_connection_lock);
Jack Moffitt's avatar
Jack Moffitt committed
577
578
579
}


580
581
582
583
584
585
/* this returns queued clients for the connection thread. headers are
 * already provided, but need to be parsed.
 */
static client_queue_t *_get_connection(void)
{
    client_queue_t *node = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
586

587
588
    thread_spin_lock (&_connection_lock);

589
590
591
592
593
594
    if (_con_queue)
    {
        node = (client_queue_t *)_con_queue;
        _con_queue = node->next;
        if (_con_queue == NULL)
            _con_queue_tail = &_con_queue;
595
        node->next = NULL;
596
    }
597
598

    thread_spin_unlock (&_connection_lock);
599
600
    return node;
}
Jack Moffitt's avatar
Jack Moffitt committed
601
602


603
/* run along queue checking for any data that has come in or a timeout */
604
static void process_request_queue (void)
605
606
607
608
609
{
    client_queue_t **node_ref = (client_queue_t **)&_req_queue;
    ice_config_t *config = config_get_config ();
    int timeout = config->header_timeout;
    config_release_config();
Jack Moffitt's avatar
Jack Moffitt committed
610

611
612
613
614
615
616
    while (*node_ref)
    {
        client_queue_t *node = *node_ref;
        client_t *client = node->client;
        int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
        char *buf = client->refbuf->data + node->offset;
Jack Moffitt's avatar
Jack Moffitt committed
617

618
619
620
621
622
623
624
        if (len > 0)
        {
            if (client->con->con_time + timeout <= time(NULL))
                len = 0;
            else
                len = client_read_bytes (client, buf, len);
        }
Jack Moffitt's avatar
Jack Moffitt committed
625

626
627
628
629
630
        if (len > 0)
        {
            int pass_it = 1;
            char *ptr;

631
632
            /* handle \n, \r\n and nsvcap which for some strange reason has
             * EOL as \r\r\n */
633
634
635
636
637
638
639
            node->offset += len;
            client->refbuf->data [node->offset] = '\000';
            do
            {
                if (node->shoutcast == 1)
                {
                    /* password line */
640
641
                    if (strstr (client->refbuf->data, "\r\r\n") != NULL)
                        break;
642
643
644
645
646
647
648
                    if (strstr (client->refbuf->data, "\r\n") != NULL)
                        break;
                    if (strstr (client->refbuf->data, "\n") != NULL)
                        break;
                }
                /* stream_offset refers to the start of any data sent after the
                 * http style headers, we don't want to lose those */
649
650
651
652
653
654
                ptr = strstr (client->refbuf->data, "\r\r\n\r\r\n");
                if (ptr)
                {
                    node->stream_offset = (ptr+6) - client->refbuf->data;
                    break;
                }
655
656
657
658
659
660
661
662
663
664
665
666
667
668
                ptr = strstr (client->refbuf->data, "\r\n\r\n");
                if (ptr)
                {
                    node->stream_offset = (ptr+4) - client->refbuf->data;
                    break;
                }
                ptr = strstr (client->refbuf->data, "\n\n");
                if (ptr)
                {
                    node->stream_offset = (ptr+2) - client->refbuf->data;
                    break;
                }
                pass_it = 0;
            } while (0);
Jack Moffitt's avatar
Jack Moffitt committed
669

670
671
672
673
674
675
676
            if (pass_it)
            {
                if ((client_queue_t **)_req_queue_tail == &(node->next))
                    _req_queue_tail = (volatile client_queue_t **)node_ref;
                *node_ref = node->next;
                node->next = NULL;
                _add_connection (node);
677
                continue;
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
            }
        }
        else
        {
            if (len == 0 || client->con->error)
            {
                if ((client_queue_t **)_req_queue_tail == &node->next)
                    _req_queue_tail = (volatile client_queue_t **)node_ref;
                *node_ref = node->next;
                client_destroy (client);
                free (node);
                continue;
            }
        }
        node_ref = &node->next;
693
    }
694
    _handle_connection();
Jack Moffitt's avatar
Jack Moffitt committed
695
696
}

697

698
699
700
701
702
703
704
/* add node to the queue of requests. This is where the clients are when
 * initial http details are read.
 */
static void _add_request_queue (client_queue_t *node)
{
    *_req_queue_tail = node;
    _req_queue_tail = (volatile client_queue_t **)&node->next;
Jack Moffitt's avatar
Jack Moffitt committed
705
706
}

707

708
void connection_accept_loop (void)
Jack Moffitt's avatar
Jack Moffitt committed
709
{
710
    connection_t *con;
711
    ice_config_t *config;
712
    int duration = 300;
713
714
715
716

    config = config_get_config ();
    get_ssl_certificate (config);
    config_release_config ();
Jack Moffitt's avatar
Jack Moffitt committed
717

718
    while (global.running == ICECAST_RUNNING)
719
    {
720
        con = _accept_connection (duration);
721
722

        if (con)
723
        {
724
725
726
            client_queue_t *node;
            ice_config_t *config;
            client_t *client = NULL;
727
            listener_t *listener;
728

729
730
731
732
            global_lock();
            if (client_create (&client, con, NULL) < 0)
            {
                global_unlock();
733
                client_send_error(client, 403, 1, "Icecast connection limit reached");
734
735
                /* don't be too eager as this is an imposed hard limit */
                thread_sleep (400000);
736
737
                continue;
            }
738

739
740
741
            /* setup client for reading incoming http */
            client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000';

742
743
            if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock))
            {
744
                global_unlock();
745
                ICECAST_LOG_WARN("failed to set tcp options on client connection, dropping");
746
747
748
749
                client_destroy (client);
                continue;
            }

750
751
752
            node = calloc (1, sizeof (client_queue_t));
            if (node == NULL)
            {
753
                global_unlock();
754
755
756
757
758
759
                client_destroy (client);
                continue;
            }
            node->client = client;

            config = config_get_config();
760
761
762
            listener = config_get_listen_sock (config, client->con);

            if (listener)
763
            {
764
765
766
767
                if (listener->shoutcast_compat)
                    node->shoutcast = 1;
                if (listener->ssl && ssl_ok)
                    connection_uses_ssl (client->con);
768
769
                if (listener->shoutcast_mount)
                    node->shoutcast_mount = strdup (listener->shoutcast_mount);
770
            }
771
            global_unlock();
772
            config_release_config();
773
774
775

            _add_request_queue (node);
            stats_event_inc (NULL, "connections");
776
777
778
779
780
781
            duration = 5;
        }
        else
        {
            if (_req_queue == NULL)
                duration = 300; /* use longer timeouts when nothing waiting */
782
        }
783
        process_request_queue ();
784
    }
Jack Moffitt's avatar
Jack Moffitt committed
785

786
787
788
    /* Give all the other threads notification to shut down */
    thread_cond_broadcast(&global.shutdown_cond);

789
790
791
    /* wait for all the sources to shutdown */
    thread_rwlock_wlock(&_source_shutdown_rwlock);
    thread_rwlock_unlock(&_source_shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
792
793
}

794
795
796

/* Called when activating a source. Verifies that the source count is not
 * exceeded and applies any initial parameters.
797
 */
798
int connection_complete_source (source_t *source, int response)
799
{
800
    ice_config_t *config;
801
802

    global_lock ();
803
    ICECAST_LOG_DEBUG("sources count is %d", global.sources);
804

805
    config = config_get_config();
806
807
    if (global.sources < config->source_limit)
    {
808
        const char *contenttype;
809
        const char *expectcontinue;
810
        mount_proxy *mountinfo;
811
812
813
814
815
816
817
818
819
820
821
        format_type_t format_type;

        /* setup format handler */
        contenttype = httpp_getvar (source->parser, "content-type");
        if (contenttype != NULL)
        {
            format_type = format_get_type (contenttype);

            if (format_type == FORMAT_ERROR)
            {
                config_release_config();
822
                global_unlock();
823
                if (response) {
824
                    client_send_error(source->client, 403, 1, "Content-type not supported");
825
826
                    source->client = NULL;
                }
827
                ICECAST_LOG_WARN("Content-type \"%s\" not supported, dropping source", contenttype);
828
829
                return -1;
            }
830
831
832
833
        } else if (source->parser->req_type == httpp_req_put) {
            config_release_config();
            global_unlock();
            if (response) {
834
                client_send_error(source->client, 403, 1, "No Content-type given");
835
836
837
838
839
840
841
842
                source->client = NULL;
            }
            ICECAST_LOG_ERROR("Content-type not given in PUT request, dropping source");
            return -1;
        } else {
            ICECAST_LOG_ERROR("No content-type header, falling back to backwards compatibility mode "
                    "for icecast 1.x relays. Assuming content is mp3. This behaviour is deprecated "
                    "and the source client will NOT work with future Icecast versions!");
843
            format_type = FORMAT_TYPE_GENERIC;
844
845
        }

Karl Heyes's avatar
Karl Heyes committed
846
        if (format_get_plugin (format_type, source) < 0)
847
848
849
        {
            global_unlock();
            config_release_config();
850
851
            if (response)
            {
852
                client_send_error(source->client, 403, 1, "internal format allocation problem");
853
854
                source->client = NULL;
            }
855
            ICECAST_LOG_WARN("plugin format failed for \"%s\"", source->mount);
856
857
858
            return -1;
        }

859
860
861
862
863
864
865
	/* For PUT support we check for 100-continue and send back a 100 to stay in spec */
	expectcontinue = httpp_getvar (source->parser, "expect");
	if (expectcontinue != NULL)
	{
#ifdef HAVE_STRCASESTR
	    if (strcasestr (expectcontinue, "100-continue") != NULL)
#else
866
	    ICECAST_LOG_WARN("OS doesn't support case insenestive substring checks...");
867
868
869
870
871
872
873
	    if (strstr (expectcontinue, "100-continue") != NULL)
#endif
	    {
		client_send_100 (source->client);
	    }
	}

874
875
876
        global.sources++;
        stats_event_args (NULL, "sources", "%d", global.sources);
        global_unlock();
877

878
        source->running = 1;
879
        mountinfo = config_find_mount (config, source->mount, MOUNT_TYPE_NORMAL);
880
        source_update_settings (config, source, mountinfo);
881
        config_release_config();
882
        slave_rebuild_mounts();
883
884

        source->shutdown_rwlock = &_source_shutdown_rwlock;
885
        ICECAST_LOG_DEBUG("source is ready to start");
886
887
888

        return 0;
    }
889
    ICECAST_LOG_WARN("Request to add source when maximum source limit "
890
891
892
893
894
            "reached %d", global.sources);

    global_unlock();
    config_release_config();

895
896
    if (response)
    {
897
        client_send_error(source->client, 403, 1, "too many sources connected");
898
899
        source->client = NULL;
    }
900
901
902
903

    return -1;
}

Philipp Schafft's avatar
Philipp Schafft committed
904
static inline void source_startup (client_t *client, const char *uri)
905
906
{
    source_t *source;
907
    source = source_reserve (uri);
908

909
910
911
    if (source)
    {
        source->client = client;
912
913
        source->parser = client->parser;
        source->con = client->con;
914
        if (connection_complete_source (source, 1) < 0)
915
        {
916
            source_clear_source (source);
917
            source_free_source (source);
918
919
920
            return;
        }
        client->respcode = 200;
Philipp Schafft's avatar
Philipp Schafft committed
921
922
923
924
925
        if (client->protocol == ICECAST_PROTOCOL_SHOUTCAST) {
            client->respcode = 200;
            /* send this non-blocking but if there is only a partial write
             * then leave to header timeout */
            sock_write (client->con->sock, "OK2\r\nicy-caps:11\r\n\r\n");
926
927
            source->shoutcast_compat = 1;
            source_client_callback (client, source);
Philipp Schafft's avatar
Philipp Schafft committed
928
        } else {
929
            refbuf_t *ok = refbuf_new (PER_CLIENT_REFBUF_SIZE);
930
            client->respcode = 200;
931
            snprintf (ok->data, PER_CLIENT_REFBUF_SIZE,
932
                    "HTTP/1.0 200 OK\r\n\r\n");
933
934
935
936
            ok->len = strlen (ok->data);
            /* we may have unprocessed data read in, so don't overwrite it */
            ok->associated = client->refbuf;
            client->refbuf = ok;
937
938
            fserve_add_client_callback (client, source_client_callback, source);
        }
939
940
941
    }
    else
    {
942
        client_send_error(client, 403, 1, "Mountpoint in use");
943
        ICECAST_LOG_WARN("Mountpoint %s in use", uri);
944
    }
945
946
}

Philipp Schafft's avatar
Philipp Schafft committed
947
948
/* only called for native icecast source clients */
static void _handle_source_request (client_t *client, const char *uri)
Jack Moffitt's avatar
Jack Moffitt committed
949
{
Philipp Schafft's avatar
Philipp Schafft committed
950
951
    ICECAST_LOG_INFO("Source logging in at mountpoint \"%s\" from %s",
        uri, client->con->ip);
952

Philipp Schafft's avatar
Philipp Schafft committed
953
    if (uri[0] != '/')
954
    {
Philipp Schafft's avatar
Philipp Schafft committed
955
956
        ICECAST_LOG_WARN("source mountpoint not starting with /");
        client_send_error(client, 400, 1, "source mountpoint not starting with /");
957
        return;
958
    }
959

Philipp Schafft's avatar
Philipp Schafft committed
960
961
962
963
964
965
966
967
    source_startup(client, uri);
}


static void _handle_stats_request (client_t *client, char *uri)
{
    stats_event_inc(NULL, "stats_connections");

968
    client->respcode = 200;
969
970
971
972
    snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
            "HTTP/1.0 200 OK\r\n\r\n");
    client->refbuf->len = strlen (client->refbuf->data);
    fserve_add_client_callback (client, stats_callback, NULL);
973
974
}

Philipp Schafft's avatar
Philipp Schafft committed
975
976
977
978
/* if 0 is returned then the client should not be touched, however if -1
 * is returned then the caller is responsible for handling the client
 */
static int __add_listener_to_source (source_t *source, client_t *client)
979
{
Philipp Schafft's avatar
Philipp Schafft committed
980
    size_t loop = 10;
Michael Smith's avatar
Michael Smith committed
981

Philipp Schafft's avatar
Philipp Schafft committed
982
983
984
985
986
987
988
989
    do
    {
        ICECAST_LOG_DEBUG("max on %s is %ld (cur %lu)", source->mount,
                source->max_listeners, source->listeners);
        if (source->max_listeners == -1)
            break;
        if (source->listeners < (unsigned long)source->max_listeners)
            break;
990

Philipp Schafft's avatar
Philipp Schafft committed
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
        if (loop && source->fallback_when_full && source->fallback_mount)
        {
            source_t *next = source_find_mount (source->fallback_mount);
            if (!next) {
                ICECAST_LOG_ERROR("Fallback '%s' for full source '%s' not found",
                        source->mount, source->fallback_mount);
                return -1;
            }

            ICECAST_LOG_INFO("stream full trying %s", next->mount);
            source = next;
            loop--;
            continue;
        }
        /* now we fail the client */
        return -1;
    } while (1);

    client->write_to_client = format_generic_write_to_client;
    client->check_buffer = format_check_http_buffer;
    client->refbuf->len = PER_CLIENT_REFBUF_SIZE;
    memset (client->refbuf->data, 0, PER_CLIENT_REFBUF_SIZE);

    /* lets add the client to the active list */
    avl_tree_wlock (source->pending_tree);
    avl_insert (source->pending_tree, client);
    avl_tree_unlock (source->pending_tree);

    if (source->running == 0 && source->on_demand)
1020
    {
Philipp Schafft's avatar
Philipp Schafft committed
1021
1022
1023
        /* enable on-demand relay to start, wake up the slave thread */
        ICECAST_LOG_DEBUG("kicking off on-demand relay");
        source->on_demand_req = 1;
1024
    }
Philipp Schafft's avatar
Philipp Schafft committed
1025
1026
1027
    ICECAST_LOG_DEBUG("Added client to %s", source->mount);
    return 0;
}
1028

Philipp Schafft's avatar
Philipp Schafft committed
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
/* count the number of clients on a mount with same username and same role as the given one */
static inline ssize_t __count_user_role_on_mount (source_t *source, client_t *client) {
    ssize_t ret = 0;
    avl_node *node;

    avl_tree_rlock(source->client_tree);
    node = avl_get_first(source->client_tree);
    while (node) {
        client_t *existing_client = (client_t *)node->key;
        if (existing_client->username && client->username &&
            strcmp(existing_client->username, client->username) == 0 &&
            existing_client->role && client->role &&
            strcmp(existing_client->role, client->role) == 0)
            ret++;
        node = avl_get_next(node);
1044
    }
Philipp Schafft's avatar
Philipp Schafft committed
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
    avl_tree_unlock(source->client_tree);

    avl_tree_rlock(source->pending_tree);
    node = avl_get_first(source->pending_tree);
    while (node)
    {
        client_t *existing_client = (client_t *)node->key;
        if (existing_client->username && client->username &&
            strcmp(existing_client->username, client->username) == 0 &&
            existing_client->role && client->role &&
            strcmp(existing_client->role, client->role) == 0)
            ret++;
        node = avl_get_next(node);
1058
    }
Philipp Schafft's avatar
Philipp Schafft committed
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
    avl_tree_unlock(source->pending_tree);

    return ret;
}

static void _handle_get_request (client_t *client, char *uri) {
    source_t *source = NULL;

    ICECAST_LOG_DEBUG("Got client %p with URI %H", client, uri);

    /* there are several types of HTTP GET clients
     * media clients, which are looking for a source (eg, URI = /stream.ogg),
     * stats clients, which are looking for /admin/stats.xml and
     * fserve clients, which are looking for static files.
     */
1074
1075

    stats_event_inc(NULL, "client_connections");
1076

1077
    /* Dispatch all admin requests */
1078
1079
    if ((strcmp(uri, "/admin.cgi") == 0) ||
        (strncmp(uri, "/admin/", 7) == 0)) {
Philipp Schafft's avatar
Philipp Schafft committed
1080
        ICECAST_LOG_DEBUG("Client %p requesting admin interface.", client);
1081
        admin_handle_request(client, uri);
Michael Smith's avatar
Michael Smith committed
1082
1083
        return;
    }
Philipp Schafft's avatar
Philipp Schafft committed
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141

    /* this is a web/ request. let's check if we are allowed to do that. */
    if (acl_test_web(client->acl) != ACL_POLICY_ALLOW) {
        /* doesn't seem so, sad client :( */
        if (client->protocol == ICECAST_PROTOCOL_SHOUTCAST) {
            client_destroy(client);
        } else {
            client_send_error(client, 401, 1, "You need to authenticate\r\n");
        }
        return;
    }

    if (util_check_valid_extension(uri) == XSLT_CONTENT) {
        /* If the file exists, then transform it, otherwise, write a 404 */
        ICECAST_LOG_DEBUG("Stats request, sending XSL transformed stats");
        stats_transform_xslt(client, uri);
        return;
    }

    avl_tree_rlock(global.source_tree);
    /* let's see if this is a source or just a random fserve file */
    source = source_find_mount(uri);
    if (source) {
        /* true mount */
        int in_error = 0;
        ssize_t max_connections_per_user = acl_get_max_connections_per_user(client->acl);
        /* check for duplicate_logins */
        if (max_connections_per_user > 0) { /* -1 = not set (-> default=unlimited), 0 = unlimited */
            if (max_connections_per_user <= __count_user_role_on_mount(source, client)) {
                client_send_error(client, 403, 1, "Reached limit of concurrent connections on those credentials");
                in_error = 1;
            }
        }


        /* Set max listening duration in case not already set. */
        if (!in_error && client->con->discon_time == 0) {
            time_t connection_duration = acl_get_max_connection_duration(client->acl);
            if (connection_duration == -1) {
                ice_config_t *config = config_get_config();
                mount_proxy *mount = config_find_mount(config, source->mount, MOUNT_TYPE_NORMAL);
                if (mount && mount->max_listener_duration)
                    connection_duration = mount->max_listener_duration;
                config_release_config();
            }

            if (connection_duration > 0) /* -1 = not set (-> default=unlimited), 0 = unlimited */
                client->con->discon_time = connection_duration + time(NULL);
        }
        if (!in_error && __add_listener_to_source(source, client) == -1) {
            client_send_error(client, 403, 1, "Rejecting client for whatever reason");
        }
        avl_tree_unlock(global.source_tree);
    } else {
        /* file */
        avl_tree_unlock(global.source_tree);
        fserve_client_create(client, uri);
    }
1142
1143
}

1144
1145
static void _handle_shoutcast_compatible (client_queue_t *node)
{
1146
1147
1148
    char *http_compliant;
    int http_compliant_len = 0;
    http_parser_t *parser;
Philipp Schafft's avatar
Philipp Schafft committed
1149
    const char *shoutcast_mount;
1150
    client_t *client = node->client;
Philipp Schafft's avatar
Philipp Schafft committed
1151
    ice_config_t *config;
1152

1153
1154
    if (node->shoutcast == 1)
    {
Philipp Schafft's avatar
Philipp Schafft committed
1155
        char *ptr, *headers;
1156
1157

        /* Get rid of trailing \r\n or \n after password */
1158
        ptr = strstr (client->refbuf->data, "\r\r\n");
1159
        if (ptr)
1160
            headers = ptr+3;
1161
1162
        else
        {
1163
            ptr = strstr (client->refbuf->data, "\r\n");
1164
            if (ptr)
1165
1166
1167
1168
1169
1170
1171
                headers = ptr+2;
            else
            {
                ptr = strstr (client->refbuf->data, "\n");
                if (ptr)
                    headers = ptr+1;
            }
1172
        }
1173
1174
1175
1176

        if (ptr == NULL)
        {
            client_destroy (client);
1177
            free (node->shoutcast_mount);
1178
1179
1180
1181
1182
            free (node);
            return;
        }
        *ptr = '\0';