connection.c 55.8 KB
Newer Older
1
2
3
4
5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7
8
9
10
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
Philipp Schafft's avatar
Philipp Schafft committed
11
 * Copyright 2011,      Dave 'justdave' Miller <justdave@mozilla.com>,
12
 * Copyright 2011-2018, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
13
14
 */

15
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
16
17
18
19
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
20
21
#include <stdio.h>
#include <stdlib.h>
22
#include <errno.h>
Jack Moffitt's avatar
Jack Moffitt committed
23
#include <string.h>
24
#ifdef HAVE_POLL
25
#include <poll.h>
26
#endif
27
#include <sys/types.h>
28
29

#ifndef _WIN32
Jack Moffitt's avatar
Jack Moffitt committed
30
31
#include <sys/socket.h>
#include <netinet/in.h>
32
#else
33
#include <winsock2.h>
34
#endif
Jack Moffitt's avatar
Jack Moffitt committed
35

Marvin Scholz's avatar
Marvin Scholz committed
36
37
38
39
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/net/sock.h"
#include "common/httpp/httpp.h"
Jack Moffitt's avatar
Jack Moffitt committed
40

41
42
#include "compat.h"
#include "connection.h"
43
#include "cfgfile.h"
Jack Moffitt's avatar
Jack Moffitt committed
44
45
#include "global.h"
#include "util.h"
46
#include "refobject.h"
Jack Moffitt's avatar
Jack Moffitt committed
47
48
#include "refbuf.h"
#include "client.h"
49
#include "errors.h"
Jack Moffitt's avatar
Jack Moffitt committed
50
51
#include "stats.h"
#include "logging.h"
52
#include "fserve.h"
53
#include "slave.h"
54

Jack Moffitt's avatar
Jack Moffitt committed
55
#include "source.h"
56
#include "admin.h"
Michael Smith's avatar
Michael Smith committed
57
#include "auth.h"
58
#include "matchfile.h"
59
#include "tls.h"
60
#include "acl.h"
61
62
#include "refobject.h"
#include "listensocket.h"
63
#include "fastevent.h"
Jack Moffitt's avatar
Jack Moffitt committed
64
65
66

#define CATMODULE "connection"

67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/* Two different major types of source authentication.
   Shoutcast style is used only by the Shoutcast DSP
   and is a crazy version of HTTP.  It looks like :
     Source Client -> Connects to port + 1
     Source Client -> sends encoder password (plaintext)\r\n
     Icecast -> reads encoder password, if ok, sends OK2\r\n, else disconnects
     Source Client -> reads OK2\r\n, then sends http-type request headers
                      that contain the stream details (icy-name, etc..)
     Icecast -> reads headers, stores them
     Source Client -> starts sending MP3 data
     Source Client -> periodically updates metadata via admin.cgi call

   Icecast auth style uses HTTP and Basic Authorization.
*/

82
83
84
85
typedef struct client_queue_tag {
    client_t *client;
    int offset;
    int shoutcast;
86
    char *shoutcast_mount;
87
88
    char *bodybuffer;
    size_t bodybufferlen;
89
    int tried_body;
90
91
    struct client_queue_tag *next;
} client_queue_t;
Jack Moffitt's avatar
Jack Moffitt committed
92

93
static spin_t _connection_lock; // protects _current_id, _con_queue, _con_queue_tail
94
static volatile connection_id_t _current_id = 0;
Jack Moffitt's avatar
Jack Moffitt committed
95
96
static int _initialized = 0;

97
98
static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
99
static volatile client_queue_t *_body_queue = NULL, **_body_queue_tail = &_body_queue;
100
static int tls_ok;
101
static tls_ctx_t *tls_ctx;
102

103
/* filtering client connection based on IP */
104
static matchfile_t *banned_ip, *allowed_ip;
105

106
rwlock_t _source_shutdown_rwlock;
Jack Moffitt's avatar
Jack Moffitt committed
107

108
static int  _update_admin_command(client_t *client);
109
static void _handle_connection(void);
110
static void get_tls_certificate(ice_config_t *config);
Jack Moffitt's avatar
Jack Moffitt committed
111
112
113

void connection_initialize(void)
{
Marvin Scholz's avatar
Marvin Scholz committed
114
115
    if (_initialized)
        return;
116

117
    thread_spin_create (&_connection_lock);
118
    thread_mutex_create(&move_clients_mutex);
119
    thread_rwlock_create(&_source_shutdown_rwlock);
120
    thread_cond_create(&global.shutdown_cond);
121
122
123
124
    _req_queue = NULL;
    _req_queue_tail = &_req_queue;
    _con_queue = NULL;
    _con_queue_tail = &_con_queue;
125
126
    _body_queue = NULL;
    _body_queue_tail = &_body_queue;
Jack Moffitt's avatar
Jack Moffitt committed
127

128
    _initialized = 1;
Jack Moffitt's avatar
Jack Moffitt committed
129
130
131
132
}

void connection_shutdown(void)
{
Marvin Scholz's avatar
Marvin Scholz committed
133
134
    if (!_initialized)
        return;
135

136
    tls_ctx_unref(tls_ctx);
137
138
139
    matchfile_release(banned_ip);
    matchfile_release(allowed_ip);
 
140
    thread_cond_destroy(&global.shutdown_cond);
141
    thread_rwlock_destroy(&_source_shutdown_rwlock);
142
    thread_spin_destroy (&_connection_lock);
143
    thread_mutex_destroy(&move_clients_mutex);
Jack Moffitt's avatar
Jack Moffitt committed
144

145
    _initialized = 0;
Jack Moffitt's avatar
Jack Moffitt committed
146
147
}

148
void connection_reread_config(ice_config_t *config)
149
{
150
    get_tls_certificate(config);
151
    listensocket_container_configure_and_setup(global.listensockets, config);
152
153
}

154
static connection_id_t _next_connection_id(void)
Jack Moffitt's avatar
Jack Moffitt committed
155
{
156
    connection_id_t id;
Jack Moffitt's avatar
Jack Moffitt committed
157

158
    thread_spin_lock(&_connection_lock);
159
    id = _current_id++;
160
    thread_spin_unlock(&_connection_lock);
Jack Moffitt's avatar
Jack Moffitt committed
161

162
    return id;
Jack Moffitt's avatar
Jack Moffitt committed
163
164
}

165

166
#ifdef ICECAST_CAP_TLS
167
static void get_tls_certificate(ice_config_t *config)
168
{
169
170
    const char *keyfile;

171
    config->tls_ok = tls_ok = 0;
172

173
174
175
176
    keyfile = config->tls_context.key_file;
    if (!keyfile)
        keyfile = config->tls_context.cert_file;

177
    tls_ctx_unref(tls_ctx);
178
    tls_ctx = tls_ctx_new(config->tls_context.cert_file, keyfile, config->tls_context.cipher_list);
179
180
    if (!tls_ctx) {
        ICECAST_LOG_INFO("No TLS capability on any configured ports");
181
        return;
182
183
    }

184
    config->tls_ok = tls_ok = 1;
185
186
187
}


188
/* handlers for reading and writing a connection_t when there is TLS
189
190
 * configured on the listening port
 */
191
static int connection_read_tls(connection_t *con, void *buf, size_t len)
192
{
193
    ssize_t bytes = tls_read(con->tls, buf, len);
194

Marvin Scholz's avatar
Marvin Scholz committed
195
    if (bytes < 0) {
196
        if (tls_want_io(con->tls) > 0)
197
            return -1;
198
199
200
201
202
        con->error = 1;
    }
    return bytes;
}

203
static int connection_send_tls(connection_t *con, const void *buf, size_t len)
204
{
205
    ssize_t bytes = tls_write(con->tls, buf, len);
206

Marvin Scholz's avatar
Marvin Scholz committed
207
    if (bytes < 0) {
208
209
        if (tls_want_io(con->tls) > 0)
            return -1;
210
        con->error = 1;
Marvin Scholz's avatar
Marvin Scholz committed
211
    } else {
212
        con->sent_bytes += bytes;
Marvin Scholz's avatar
Marvin Scholz committed
213
    }
214

215
216
217
218
    return bytes;
}
#else

219
220
/* TLS not compiled in, so at least log it */
static void get_tls_certificate(ice_config_t *config)
221
{
222
    tls_ok = 0;
223
    ICECAST_LOG_INFO("No TLS capability. "
Philipp Schafft's avatar
Philipp Schafft committed
224
                     "Rebuild Icecast with OpenSSL support to enable this.");
225
}
226
#endif /* ICECAST_CAP_TLS */
227
228
229
230
231


/* handlers (default) for reading and writing a connection_t, no encrpytion
 * used just straight access to the socket
 */
Marvin Scholz's avatar
Marvin Scholz committed
232
static int connection_read(connection_t *con, void *buf, size_t len)
233
{
Marvin Scholz's avatar
Marvin Scholz committed
234
    int bytes = sock_read_bytes(con->sock, buf, len);
235
236
    if (bytes == 0)
        con->error = 1;
Marvin Scholz's avatar
Marvin Scholz committed
237
    if (bytes == -1 && !sock_recoverable(sock_error()))
238
239
240
241
        con->error = 1;
    return bytes;
}

Marvin Scholz's avatar
Marvin Scholz committed
242
static int connection_send(connection_t *con, const void *buf, size_t len)
243
{
Marvin Scholz's avatar
Marvin Scholz committed
244
245
246
    int bytes = sock_write_bytes(con->sock, buf, len);
    if (bytes < 0) {
        if (!sock_recoverable(sock_error()))
247
            con->error = 1;
Marvin Scholz's avatar
Marvin Scholz committed
248
    } else {
249
        con->sent_bytes += bytes;
Marvin Scholz's avatar
Marvin Scholz committed
250
    }
251

252
253
254
    return bytes;
}

255
connection_t *connection_create(sock_t sock, listensocket_t *listensocket_real, listensocket_t* listensocket_effective, char *ip)
256
{
257
    connection_t *con;
258
259
260
261

    if (!matchfile_match_allow_deny(allowed_ip, banned_ip, ip))
        return NULL;

262
    con = (connection_t *)calloc(1, sizeof(connection_t));
Marvin Scholz's avatar
Marvin Scholz committed
263
    if (con) {
264
265
266
        refobject_ref(listensocket_real);
        refobject_ref(listensocket_effective);

Marvin Scholz's avatar
Marvin Scholz committed
267
        con->sock       = sock;
268
269
        con->listensocket_real = listensocket_real;
        con->listensocket_effective = listensocket_effective;
Marvin Scholz's avatar
Marvin Scholz committed
270
271
272
        con->con_time   = time(NULL);
        con->id         = _next_connection_id();
        con->ip         = ip;
273
        con->tlsmode    = ICECAST_TLSMODE_AUTO;
Marvin Scholz's avatar
Marvin Scholz committed
274
275
        con->read       = connection_read;
        con->send       = connection_send;
276
    }
Michael Smith's avatar
Michael Smith committed
277

278
279
    fastevent_emit(FASTEVENT_TYPE_CONNECTION_CREATE, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_CONNECTION, con);

280
    return con;
281
282
}

283
/* prepare connection for interacting over a TLS connection
284
 */
285
void connection_uses_tls(connection_t *con)
286
{
287
#ifdef ICECAST_CAP_TLS
288
    if (con->tls)
289
290
        return;

291
292
293
294
295
296
    if (con->readbufferlen) {
        ICECAST_LOG_ERROR("Connection is now using TLS but has data put back. BAD. Discarding putback data.");
        free(con->readbuffer);
        con->readbufferlen = 0;
    }

297
    con->tlsmode = ICECAST_TLSMODE_RFC2818;
298
299
    con->read = connection_read_tls;
    con->send = connection_send_tls;
300
301
302
    con->tls = tls_new(tls_ctx);
    tls_set_incoming(con->tls);
    tls_set_socket(con->tls, con->sock);
303
304
305
#endif
}

306
307
ssize_t connection_send_bytes(connection_t *con, const void *buf, size_t len)
{
308
309
310
311
312
    ssize_t ret = con->send(con, buf, len);

    fastevent_emit(FASTEVENT_TYPE_CONNECTION_WRITE, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_OBRD, con, buf, len, ret);

    return ret;
313
314
}

315
static inline ssize_t connection_read_bytes_real(connection_t *con, void *buf, size_t len)
316
{
317
318
319
320
321
322
323
324
    ssize_t done = 0;
    ssize_t ret;

    if (con->readbufferlen) {
        ICECAST_LOG_DEBUG("On connection %p we read from putback buffer, filled with %zu bytes, requested are %zu bytes", con, con->readbufferlen, len);
        if (len >= con->readbufferlen) {
            memcpy(buf, con->readbuffer, con->readbufferlen);
            free(con->readbuffer);
325
            con->readbuffer = NULL;
326
            ICECAST_LOG_DEBUG("New fill in buffer=<empty>");
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
            if (len == con->readbufferlen) {
                con->readbufferlen = 0;
                return len;
            } else {
                len -= con->readbufferlen;
                buf += con->readbufferlen;
                done = con->readbufferlen;
                con->readbufferlen = 0;
            }
        } else {
            memcpy(buf, con->readbuffer, len);
            memmove(con->readbuffer, con->readbuffer+len, con->readbufferlen-len);
            con->readbufferlen -= len;
            return len;
        }
    }

    ret = con->read(con, buf, len);

    if (ret < 0) {
        if (done == 0) {
            return ret;
        } else {
            return done;
        }
    }

    return done + ret;
}

357
358
359
360
361
362
363
364
365
ssize_t connection_read_bytes(connection_t *con, void *buf, size_t len)
{
    ssize_t ret = connection_read_bytes_real(con, buf, len);

    fastevent_emit(FASTEVENT_TYPE_CONNECTION_READ, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_OBRD, con, buf, len, ret);

    return ret;
}

366
367
368
369
int connection_read_put_back(connection_t *con, const void *buf, size_t len)
{
    void *n;

370
371
    fastevent_emit(FASTEVENT_TYPE_CONNECTION_PUTBACK, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_OBR, con, buf, len);

372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
    if (con->readbufferlen) {
        n = realloc(con->readbuffer, con->readbufferlen + len);
        if (!n)
            return -1;

        memcpy(n + con->readbufferlen, buf, len);
        con->readbuffer = n;
        con->readbufferlen += len;

        ICECAST_LOG_DEBUG("On connection %p %zu bytes have been put back.", con, len);
        return 0;
    } else {
        n = malloc(len);
        if (!n)
            return -1;

        memcpy(n, buf, len);
        con->readbuffer = n;
        con->readbufferlen = len;
        ICECAST_LOG_DEBUG("On connection %p %zu bytes have been put back.", con, len);
        return 0;
    }
394
395
}

396
397
398
399
/* add client to connection queue. At this point some header information
 * has been collected, so we now pass it onto the connection thread for
 * further processing
 */
400
static void _add_connection(client_queue_t *node)
Jack Moffitt's avatar
Jack Moffitt committed
401
{
402
    thread_spin_lock(&_connection_lock);
403
    *_con_queue_tail = node;
404
405
    _con_queue_tail = (volatile client_queue_t **) &node->next;
    thread_spin_unlock(&_connection_lock);
Jack Moffitt's avatar
Jack Moffitt committed
406
407
408
}


409
410
411
412
413
414
/* this returns queued clients for the connection thread. headers are
 * already provided, but need to be parsed.
 */
static client_queue_t *_get_connection(void)
{
    client_queue_t *node = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
415

Marvin Scholz's avatar
Marvin Scholz committed
416
    thread_spin_lock(&_connection_lock);
417

Marvin Scholz's avatar
Marvin Scholz committed
418
    if (_con_queue){
419
420
421
422
        node = (client_queue_t *)_con_queue;
        _con_queue = node->next;
        if (_con_queue == NULL)
            _con_queue_tail = &_con_queue;
423
        node->next = NULL;
424
    }
425

Marvin Scholz's avatar
Marvin Scholz committed
426
    thread_spin_unlock(&_connection_lock);
427
428
    return node;
}
Jack Moffitt's avatar
Jack Moffitt committed
429
430


431
/* run along queue checking for any data that has come in or a timeout */
432
static void process_request_queue (void)
433
434
{
    client_queue_t **node_ref = (client_queue_t **)&_req_queue;
435
436
437
438
439
440
    ice_config_t *config;
    int timeout;
    char peak;

    config = config_get_config();
    timeout = config->header_timeout;
441
    config_release_config();
Jack Moffitt's avatar
Jack Moffitt committed
442

Marvin Scholz's avatar
Marvin Scholz committed
443
    while (*node_ref) {
444
445
446
447
        client_queue_t *node = *node_ref;
        client_t *client = node->client;
        int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
        char *buf = client->refbuf->data + node->offset;
Jack Moffitt's avatar
Jack Moffitt committed
448

449
        if (client->con->tlsmode == ICECAST_TLSMODE_AUTO || client->con->tlsmode == ICECAST_TLSMODE_AUTO_NO_PLAIN) {
450
451
            if (recv(client->con->sock, &peak, 1, MSG_PEEK) == 1) {
                if (peak == 0x16) { /* TLS Record Protocol Content type 0x16 == Handshake */
452
                    connection_uses_tls(client->con);
453
454
455
456
                }
            }
        }

Marvin Scholz's avatar
Marvin Scholz committed
457
458
        if (len > 0) {
            if (client->con->con_time + timeout <= time(NULL)) {
459
                len = 0;
Marvin Scholz's avatar
Marvin Scholz committed
460
461
462
            } else {
                len = client_read_bytes(client, buf, len);
            }
463
        }
Jack Moffitt's avatar
Jack Moffitt committed
464

Marvin Scholz's avatar
Marvin Scholz committed
465
        if (len > 0) {
466
            ssize_t stream_offset = -1;
467
468
469
            int pass_it = 1;
            char *ptr;

470
471
            /* handle \n, \r\n and nsvcap which for some strange reason has
             * EOL as \r\r\n */
472
            node->offset += len;
Marvin Scholz's avatar
Marvin Scholz committed
473
474
475
            client->refbuf->data[node->offset] = '\000';
            do {
                if (node->shoutcast == 1) {
476
                    /* password line */
477
478
                    if (strstr (client->refbuf->data, "\r\r\n") != NULL)
                        break;
479
480
481
482
483
484
485
                    if (strstr (client->refbuf->data, "\r\n") != NULL)
                        break;
                    if (strstr (client->refbuf->data, "\n") != NULL)
                        break;
                }
                /* stream_offset refers to the start of any data sent after the
                 * http style headers, we don't want to lose those */
Marvin Scholz's avatar
Marvin Scholz committed
486
487
                ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n");
                if (ptr) {
488
                    stream_offset = (ptr+6) - client->refbuf->data;
489
490
                    break;
                }
Marvin Scholz's avatar
Marvin Scholz committed
491
492
                ptr = strstr(client->refbuf->data, "\r\n\r\n");
                if (ptr) {
493
                    stream_offset = (ptr+4) - client->refbuf->data;
494
495
                    break;
                }
Marvin Scholz's avatar
Marvin Scholz committed
496
497
                ptr = strstr(client->refbuf->data, "\n\n");
                if (ptr) {
498
                    stream_offset = (ptr+2) - client->refbuf->data;
499
500
501
502
                    break;
                }
                pass_it = 0;
            } while (0);
Jack Moffitt's avatar
Jack Moffitt committed
503

Marvin Scholz's avatar
Marvin Scholz committed
504
            if (pass_it) {
505
506
507
508
                if (stream_offset != -1) {
                    connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset);
                    node->offset = stream_offset;
                }
509
510
511
512
                if ((client_queue_t **)_req_queue_tail == &(node->next))
                    _req_queue_tail = (volatile client_queue_t **)node_ref;
                *node_ref = node->next;
                node->next = NULL;
Marvin Scholz's avatar
Marvin Scholz committed
513
                _add_connection(node);
514
                continue;
515
            }
Marvin Scholz's avatar
Marvin Scholz committed
516
517
        } else {
            if (len == 0 || client->con->error) {
518
519
520
                if ((client_queue_t **)_req_queue_tail == &node->next)
                    _req_queue_tail = (volatile client_queue_t **)node_ref;
                *node_ref = node->next;
Marvin Scholz's avatar
Marvin Scholz committed
521
522
                client_destroy(client);
                free(node);
523
524
525
526
                continue;
            }
        }
        node_ref = &node->next;
527
    }
528
    _handle_connection();
Jack Moffitt's avatar
Jack Moffitt committed
529
530
}

531
532
533
534
535
536
537
538
539
540
541
542
/* add client to body queue.
 */
static void _add_body_client(client_queue_t *node)
{
    ICECAST_LOG_DEBUG("Putting client %p in body queue.", node->client);

    thread_spin_lock(&_connection_lock);
    *_body_queue_tail = node;
    _body_queue_tail = (volatile client_queue_t **) &node->next;
    thread_spin_unlock(&_connection_lock);
}

543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
static client_slurp_result_t process_request_body_queue_one(client_queue_t *node, time_t timeout, size_t body_size_limit)
{
        client_t *client = node->client;
        client_slurp_result_t res;

        if (client->parser->req_type == httpp_req_post) {
            if (node->bodybuffer == NULL && client->request_body_read == 0) {
                if (client->request_body_length < 0) {
                    node->bodybufferlen = body_size_limit;
                    node->bodybuffer = malloc(node->bodybufferlen);
                } else if (client->request_body_length <= (ssize_t)body_size_limit) {
                    node->bodybufferlen = client->request_body_length;
                    node->bodybuffer = malloc(node->bodybufferlen);
                }
            }
        }

        if (node->bodybuffer) {
            res = client_body_slurp(client, node->bodybuffer, &(node->bodybufferlen));
            if (res == CLIENT_SLURP_SUCCESS) {
                httpp_parse_postdata(client->parser, node->bodybuffer, node->bodybufferlen);
                free(node->bodybuffer);
                node->bodybuffer = NULL;
            }
        } else {
            res = client_body_skip(client);
        }

        if (res != CLIENT_SLURP_SUCCESS) {
            if (client->con->con_time <= timeout || client->request_body_read >= body_size_limit) {
                return CLIENT_SLURP_ERROR;
            }
        }

        return res;
}
579
580
581
582
583

/* This queue reads data from the body of clients. */
static void process_request_body_queue (void)
{
    client_queue_t **node_ref = (client_queue_t **)&_body_queue;
584
585
586
    ice_config_t *config;
    time_t timeout;
    size_t body_size_limit;
587
588
589
590
591

    ICECAST_LOG_DEBUG("Processing body queue.");

    ICECAST_LOG_DEBUG("_body_queue=%p, &_body_queue=%p, _body_queue_tail=%p", _body_queue, &_body_queue, _body_queue_tail);

592
593
594
595
596
    config = config_get_config();
    timeout = time(NULL) - config->body_timeout;
    body_size_limit = config->body_size_limit;
    config_release_config();

597
598
599
600
601
    while (*node_ref) {
        client_queue_t *node = *node_ref;
        client_t *client = node->client;
        client_slurp_result_t res;

602
603
        node->tried_body = 1;

604
605
        ICECAST_LOG_DEBUG("Got client %p in body queue.", client);

606
        res = process_request_body_queue_one(node, timeout, body_size_limit);
607

608
        if (res != CLIENT_SLURP_NEEDS_MORE_DATA) {
609
610
611
612
613
614
615
616
617
618
619
620
            ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client);

            if ((client_queue_t **)_body_queue_tail == &(node->next))
                _body_queue_tail = (volatile client_queue_t **)node_ref;
            *node_ref = node->next;
            node->next = NULL;
            _add_connection(node);
            continue;
        }
        node_ref = &node->next;
    }
}
621

622
623
624
/* add node to the queue of requests. This is where the clients are when
 * initial http details are read.
 */
Marvin Scholz's avatar
Marvin Scholz committed
625
static void _add_request_queue(client_queue_t *node)
626
627
628
{
    *_req_queue_tail = node;
    _req_queue_tail = (volatile client_queue_t **)&node->next;
Jack Moffitt's avatar
Jack Moffitt committed
629
630
}

631
632
633
static client_queue_t *create_client_node(client_t *client)
{
    client_queue_t *node = calloc (1, sizeof (client_queue_t));
634
    const listener_t *listener;
635
636
637
638
639
640

    if (!node)
        return NULL;

    node->client = client;

641
    listener = listensocket_get_listener(client->con->listensocket_effective);
642
643
644
645

    if (listener) {
        if (listener->shoutcast_compat)
            node->shoutcast = 1;
646
647
        client->con->tlsmode = listener->tls;
        if (listener->tls == ICECAST_TLSMODE_RFC2818 && tls_ok)
648
            connection_uses_tls(client->con);
649
650
651
652
        if (listener->shoutcast_mount)
            node->shoutcast_mount = strdup(listener->shoutcast_mount);
    }

653
654
    listensocket_release_listener(client->con->listensocket_effective);

655
656
    return node;
}
657

658
659
660
661
662
663
664
665
void connection_queue(connection_t *con)
{
    client_queue_t *node;
    client_t *client = NULL;

    global_lock();
    if (client_create(&client, con, NULL) < 0) {
        global_unlock();
666
        client_send_error_by_id(client, ICECAST_ERROR_GEN_CLIENT_LIMIT);
667
668
669
670
671
672
673
674
675
676
        /* don't be too eager as this is an imposed hard limit */
        thread_sleep(400000);
        return;
    }

    /* setup client for reading incoming http */
    client->refbuf->data[PER_CLIENT_REFBUF_SIZE-1] = '\000';

    if (sock_set_blocking(client->con->sock, 0) || sock_set_nodelay(client->con->sock)) {
        global_unlock();
677
        ICECAST_LOG_WARN("Failed to set tcp options on client connection, dropping");
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
        client_destroy(client);
        return;
    }
    node = create_client_node(client);
    global_unlock();

    if (node == NULL) {
        client_destroy(client);
        return;
    }

    _add_request_queue(node);
    stats_event_inc(NULL, "connections");
}

Marvin Scholz's avatar
Marvin Scholz committed
693
void connection_accept_loop(void)
Jack Moffitt's avatar
Jack Moffitt committed
694
{
695
    connection_t *con;
696
    ice_config_t *config;
697
    int duration = 300;
698

Marvin Scholz's avatar
Marvin Scholz committed
699
    config = config_get_config();
700
    get_tls_certificate(config);
Marvin Scholz's avatar
Marvin Scholz committed
701
    config_release_config();
Jack Moffitt's avatar
Jack Moffitt committed
702

Marvin Scholz's avatar
Marvin Scholz committed
703
    while (global.running == ICECAST_RUNNING) {
704
        con = listensocket_container_accept(global.listensockets, duration);
705

Marvin Scholz's avatar
Marvin Scholz committed
706
        if (con) {
707
            connection_queue(con);
708
            duration = 5;
Marvin Scholz's avatar
Marvin Scholz committed
709
        } else {
710
711
            if (_req_queue == NULL)
                duration = 300; /* use longer timeouts when nothing waiting */
712
        }
Marvin Scholz's avatar
Marvin Scholz committed
713
        process_request_queue();
714
        process_request_body_queue();
715
    }
Jack Moffitt's avatar
Jack Moffitt committed
716

717
718
719
    /* Give all the other threads notification to shut down */
    thread_cond_broadcast(&global.shutdown_cond);

720
721
722
    /* wait for all the sources to shutdown */
    thread_rwlock_wlock(&_source_shutdown_rwlock);
    thread_rwlock_unlock(&_source_shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
723
724
}

725
726
727

/* Called when activating a source. Verifies that the source count is not
 * exceeded and applies any initial parameters.
728
 */
Marvin Scholz's avatar
Marvin Scholz committed
729
int connection_complete_source(source_t *source, int response)
730
{
731
    ice_config_t *config;
732

Marvin Scholz's avatar
Marvin Scholz committed
733
    global_lock();
734
    ICECAST_LOG_DEBUG("sources count is %d", global.sources);
735

736
    config = config_get_config();
Marvin Scholz's avatar
Marvin Scholz committed
737
    if (global.sources < config->source_limit) {
738
        const char *contenttype;
739
        mount_proxy *mountinfo;
740
741
742
743
        format_type_t format_type;

        /* setup format handler */
        contenttype = httpp_getvar (source->parser, "content-type");
Marvin Scholz's avatar
Marvin Scholz committed
744
745
        if (contenttype != NULL) {
            format_type = format_get_type(contenttype);
746

Marvin Scholz's avatar
Marvin Scholz committed
747
            if (format_type == FORMAT_ERROR) {
748
                config_release_config();
749
                global_unlock();
750
                if (response) {
751
                    client_send_error_by_id(source->client, ICECAST_ERROR_CON_CONTENT_TYPE_NOSYS);
752
753
                    source->client = NULL;
                }
754
                ICECAST_LOG_WARN("Content-type \"%s\" not supported, dropping source", contenttype);
755
756
                return -1;
            }
757
758
759
760
        } else if (source->parser->req_type == httpp_req_put) {
            config_release_config();
            global_unlock();
            if (response) {
761
                client_send_error_by_id(source->client, ICECAST_ERROR_CON_NO_CONTENT_TYPE_GIVEN);
762
763
764
765
766
767
768
769
                source->client = NULL;
            }
            ICECAST_LOG_ERROR("Content-type not given in PUT request, dropping source");
            return -1;
        } else {
            ICECAST_LOG_ERROR("No content-type header, falling back to backwards compatibility mode "
                    "for icecast 1.x relays. Assuming content is mp3. This behaviour is deprecated "
                    "and the source client will NOT work with future Icecast versions!");
770
            format_type = FORMAT_TYPE_GENERIC;
771
772
        }

Marvin Scholz's avatar
Marvin Scholz committed
773
        if (format_get_plugin (format_type, source) < 0) {
774
775
            global_unlock();
            config_release_config();
Marvin Scholz's avatar
Marvin Scholz committed
776
            if (response) {
777
                client_send_error_by_id(source->client, ICECAST_ERROR_CON_INTERNAL_FORMAT_ALLOC_ERROR);
778
779
                source->client = NULL;
            }
780
            ICECAST_LOG_WARN("plugin format failed for \"%s\"", source->mount);
781
782
783
            return -1;
        }

784
        global.sources++;
Marvin Scholz's avatar
Marvin Scholz committed
785
        stats_event_args(NULL, "sources", "%d", global.sources);
786
        global_unlock();
787

788
        source->running = 1;
Marvin Scholz's avatar
Marvin Scholz committed
789
790
        mountinfo = config_find_mount(config, source->mount, MOUNT_TYPE_NORMAL);
        source_update_settings(config, source, mountinfo);
791
        config_release_config();
792
        slave_rebuild_mounts();
793
794

        source->shutdown_rwlock = &_source_shutdown_rwlock;
795
        ICECAST_LOG_DEBUG("source is ready to start");
796
797
798

        return 0;
    }
799
    ICECAST_LOG_WARN("Request to add source when maximum source limit "
Marvin Scholz's avatar
Marvin Scholz committed
800
        "reached %d", global.sources);
801
802
803
804

    global_unlock();
    config_release_config();

Marvin Scholz's avatar
Marvin Scholz committed
805
    if (response) {
806
        client_send_error_by_id(source->client, ICECAST_ERROR_CON_SOURCE_CLIENT_LIMIT);
807
808
        source->client = NULL;
    }
809
810
811
812

    return -1;
}

813
static inline void source_startup(client_t *client)
814
815
{
    source_t *source;
816
    source = source_reserve(client->uri);
817

Marvin Scholz's avatar
Marvin Scholz committed
818
    if (source) {
819
        source->client = client;
820
821
        source->parser = client->parser;
        source->con = client->con;
Marvin Scholz's avatar
Marvin Scholz committed
822
823
824
        if (connection_complete_source(source, 1) < 0) {
            source_clear_source(source);
            source_free_source(source);
825
826
827
            return;
        }
        client->respcode = 200;
Philipp Schafft's avatar
Philipp Schafft committed
828
829
830
831
        if (client->protocol == ICECAST_PROTOCOL_SHOUTCAST) {
            client->respcode = 200;
            /* send this non-blocking but if there is only a partial write
             * then leave to header timeout */
832
            client_send_bytes(client, "OK2\r\nicy-caps:11\r\n\r\n", 20); /* TODO: Replace Magic Number! */
833
            source->shoutcast_compat = 1;
Marvin Scholz's avatar
Marvin Scholz committed
834
            source_client_callback(client, source);
Philipp Schafft's avatar
Philipp Schafft committed
835
        } else {
Marvin Scholz's avatar
Marvin Scholz committed
836
            refbuf_t *ok = refbuf_new(PER_CLIENT_REFBUF_SIZE);
837
            const char *expectcontinue;
838
            const char *transfer_encoding;
839
            int status_to_send = 200;
840
            ssize_t ret;
841

842
843
844
845
            transfer_encoding = httpp_getvar(source->parser, "transfer-encoding");
            if (transfer_encoding && strcasecmp(transfer_encoding, HTTPP_ENCODING_IDENTITY) != 0) {
                client->encoding = httpp_encoding_new(transfer_encoding);
                if (!client->encoding) {
846
                    client_send_error_by_id(client, ICECAST_ERROR_CON_UNIMPLEMENTED);
847
848
849
850
                    return;
                }
            }

851
852
853
854
855
856
857
            /* For PUT support we check for 100-continue and send back a 100 to stay in spec */
            expectcontinue = httpp_getvar (source->parser, "expect");

            if (expectcontinue != NULL) {
#ifdef HAVE_STRCASESTR
                if (strcasestr (expectcontinue, "100-continue") != NULL)
#else
858
                ICECAST_LOG_WARN("OS doesn't support case insensitive substring checks...");
859
860
861
862
863
864
865
                if (strstr (expectcontinue, "100-continue") != NULL)
#endif
                {
                    status_to_send = 100;
                }
            }

866
            client->respcode = 200;
867
868
            ret = util_http_build_header(ok->data, PER_CLIENT_REFBUF_SIZE, 0, 0, status_to_send, NULL, NULL, NULL, NULL, NULL, client);
            snprintf(ok->data + ret, PER_CLIENT_REFBUF_SIZE - ret, "Content-Length: 0\r\n\r\n");
Marvin Scholz's avatar
Marvin Scholz committed
869
            ok->len = strlen(ok->data);
870
            refbuf_release(client->refbuf);
871
            client->refbuf = ok;
Marvin Scholz's avatar
Marvin Scholz committed
872
            fserve_add_client_callback(client, source_client_callback, source);
873
        }
Marvin Scholz's avatar
Marvin Scholz committed
874
    } else {
875
        client_send_error_by_id(client, ICECAST_ERROR_CON_MOUNT_IN_USE);
876
        ICECAST_LOG_WARN("Mountpoint %s in use", client->uri);
877
    }
878
879
}

Philipp Schafft's avatar
Philipp Schafft committed
880
/* only called for native icecast source clients */
881
static void _handle_source_request(client_t *client)
Jack Moffitt's avatar
Jack Moffitt committed
882
{
883
    ICECAST_LOG_INFO("Source logging in at mountpoint \"%s\" from %s as role %s",
884
        client->uri, client->con->ip, client->role);
885

886
    if (client->uri[0] != '/') {
Philipp Schafft's avatar
Philipp Schafft committed
887
        ICECAST_LOG_WARN("source mountpoint not starting with /");
888
        client_send_error_by_id(client, ICECAST_ERROR_CON_MOUNTPOINT_NOT_STARTING_WITH_SLASH);
889
        return;
890
    }
891

892
    source_startup(client);
Philipp Schafft's avatar
Philipp Schafft committed
893
894
895
}


896
static void _handle_stats_request(client_t *client)
Philipp Schafft's avatar
Philipp Schafft committed
897
898
899
{
    stats_event_inc(NULL, "stats_connections");

900
    client->respcode = 200;
901
    snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
Marvin Scholz's avatar
Marvin Scholz committed
902
903
904
        "HTTP/1.0 200 OK\r\n\r\n");
    client->refbuf->len = strlen(client->refbuf->data);
    fserve_add_client_callback(client, stats_callback, NULL);
905
906
}

Philipp Schafft's avatar
Philipp Schafft committed
907
908
909
/* if 0 is returned then the client should not be touched, however if -1
 * is returned then the caller is responsible for handling the client
 */
Marvin Scholz's avatar
Marvin Scholz committed
910
static int __add_listener_to_source(source_t *source, client_t *client)
911
{
Philipp Schafft's avatar
Philipp Schafft committed
912
    size_t loop = 10;
Michael Smith's avatar
Michael Smith committed
913

Marvin Scholz's avatar
Marvin Scholz committed
914
    do {
Philipp Schafft's avatar
Philipp Schafft committed
915
        ICECAST_LOG_DEBUG("max on %s is %ld (cur %lu)", source->mount,
Marvin Scholz's avatar
Marvin Scholz committed
916
            source->max_listeners, source->listeners);
Philipp Schafft's avatar
Philipp Schafft committed
917
918
919
920
        if (source->max_listeners == -1)
            break;
        if (source->listeners < (unsigned long)source->max_listeners)
            break;
921

Marvin Scholz's avatar
Marvin Scholz committed
922
        if (loop && source->fallback_when_full && source->fallback_mount) {