connection.c 23 KB
Newer Older
Jack Moffitt's avatar
Jack Moffitt committed
1
2
3
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
4
#include <time.h>
Jack Moffitt's avatar
Jack Moffitt committed
5
#include <sys/types.h>
6
#include <sys/stat.h>
7
8
9
#ifdef HAVE_POLL
#include <sys/poll.h>
#endif
10
11
12

#ifndef _WIN32
#include <sys/time.h>
Jack Moffitt's avatar
Jack Moffitt committed
13
14
#include <sys/socket.h>
#include <netinet/in.h>
15
#else
16
#include <winsock2.h>
17
18
#define snprintf _snprintf
#define strcasecmp stricmp
19
#define strncasecmp strnicmp
20
#endif
Jack Moffitt's avatar
Jack Moffitt committed
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

#include "os.h"

#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"

#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
38
#include "xslt.h"
39
#include "fserve.h"
40
41

#include "yp.h"
Jack Moffitt's avatar
Jack Moffitt committed
42
#include "source.h"
43
#include "geturl.h"
Michael Smith's avatar
Michael Smith committed
44
#include "format.h"
45
#include "format_mp3.h"
Michael Smith's avatar
Michael Smith committed
46
#include "event.h"
47
#include "admin.h"
Jack Moffitt's avatar
Jack Moffitt committed
48
49
50
51
52
53
54
55
56

#define CATMODULE "connection"

typedef struct con_queue_tag {
	connection_t *con;
	struct con_queue_tag *next;
} con_queue_t;

typedef struct _thread_queue_tag {
57
	thread_type *thread_id;
Jack Moffitt's avatar
Jack Moffitt committed
58
59
60
61
62
63
64
65
66
67
68
69
70
	struct _thread_queue_tag *next;
} thread_queue_t;

static mutex_t _connection_mutex;
static unsigned long _current_id = 0;
static int _initialized = 0;
static cond_t _pool_cond;

static con_queue_t *_queue = NULL;
static mutex_t _queue_mutex;

static thread_queue_t *_conhands = NULL;

71
rwlock_t _source_shutdown_rwlock;
Jack Moffitt's avatar
Jack Moffitt committed
72
73
74
75
76
77
78
79
80
81
82

static void *_handle_connection(void *arg);

void connection_initialize(void)
{
	if (_initialized) return;
	
	thread_mutex_create(&_connection_mutex);
	thread_mutex_create(&_queue_mutex);
	thread_rwlock_create(&_source_shutdown_rwlock);
	thread_cond_create(&_pool_cond);
83
    thread_cond_create(&global.shutdown_cond);
Jack Moffitt's avatar
Jack Moffitt committed
84
85
86
87
88
89
90
91

	_initialized = 1;
}

void connection_shutdown(void)
{
	if (!_initialized) return;
	
92
    thread_cond_destroy(&global.shutdown_cond);
Jack Moffitt's avatar
Jack Moffitt committed
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
	thread_cond_destroy(&_pool_cond);
	thread_rwlock_destroy(&_source_shutdown_rwlock);
	thread_mutex_destroy(&_queue_mutex);
	thread_mutex_destroy(&_connection_mutex);

	_initialized = 0;
}

static unsigned long _next_connection_id(void)
{
	unsigned long id;

	thread_mutex_lock(&_connection_mutex);
	id = _current_id++;
	thread_mutex_unlock(&_connection_mutex);

	return id;
}

112
113
114
115
116
117
118
119
connection_t *create_connection(sock_t sock, char *ip) {
	connection_t *con;
	con = (connection_t *)malloc(sizeof(connection_t));
	memset(con, 0, sizeof(connection_t));
	con->sock = sock;
	con->con_time = time(NULL);
	con->id = _next_connection_id();
	con->ip = ip;
Michael Smith's avatar
Michael Smith committed
120
121
122
123

    con->event_number = EVENT_NO_EVENT;
    con->event = NULL;

124
125
126
	return con;
}

127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
static int wait_for_serversock(int timeout)
{
#ifdef HAVE_POLL
    struct pollfd ufds[MAX_LISTEN_SOCKETS];
    int i, ret;

    for(i=0; i < global.server_sockets; i++) {
        ufds[i].fd = global.serversock[i];
        ufds[i].events = POLLIN;
        ufds[i].revents = 0;
    }

    ret = poll(ufds, global.server_sockets, timeout);
    if(ret < 0) {
        return -2;
    }
    else if(ret == 0) {
        return -1;
    }
    else {
        for(i=0; i < global.server_sockets; i++) {
            if(ufds[i].revents == POLLIN)
                return ufds[i].fd;
        }
        return -1; /* Shouldn't happen */
    }
#else
    fd_set rfds;
    struct timeval tv, *p=NULL;
    int i, ret;
    int max = -1;

    FD_ZERO(&rfds);

    for(i=0; i < global.server_sockets; i++) {
        FD_SET(global.serversock[i], &rfds);
        if(global.serversock[i] > max)
            max = global.serversock[i];
    }

    if(timeout >= 0) {
        tv.tv_sec = timeout/1000;
        tv.tv_usec = (timeout % 1000)/1000;
        p = &tv;
    }

    ret = select(max+1, &rfds, NULL, NULL, p);
    if(ret < 0) {
        return -2;
    }
    else if(ret == 0) {
        return -1;
    }
    else {
        for(i=0; i < global.server_sockets; i++) {
            if(FD_ISSET(global.serversock[i], &rfds))
                return global.serversock[i];
        }
        return -1; /* Should be impossible, stop compiler warnings */
    }
#endif
}

Jack Moffitt's avatar
Jack Moffitt committed
190
191
192
193
194
static connection_t *_accept_connection(void)
{
	int sock;
	connection_t *con;
	char *ip;
195
    int serversock; 
Jack Moffitt's avatar
Jack Moffitt committed
196

197
198
199
    serversock = wait_for_serversock(100);
    if(serversock < 0)
        return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
200

201
	/* malloc enough room for a full IP address (including ipv6) */
202
	ip = (char *)malloc(MAX_ADDR_LEN);
Jack Moffitt's avatar
Jack Moffitt committed
203

204
	sock = sock_accept(serversock, ip, MAX_ADDR_LEN);
Jack Moffitt's avatar
Jack Moffitt committed
205
	if (sock >= 0) {
206
		con = create_connection(sock, ip);
Jack Moffitt's avatar
Jack Moffitt committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237

		return con;
	}

	if (!sock_recoverable(sock_error()))
		WARN2("accept() failed with error %d: %s", sock_error(), strerror(sock_error()));
	
	free(ip);

	return NULL;
}

static void _add_connection(connection_t *con)
{
	con_queue_t *node;

	node = (con_queue_t *)malloc(sizeof(con_queue_t));
	
	thread_mutex_lock(&_queue_mutex);
	node->con = con;
	node->next = _queue;
	_queue = node;
	thread_mutex_unlock(&_queue_mutex);

}

static void _signal_pool(void)
{
	thread_cond_signal(&_pool_cond);
}

238
static void _push_thread(thread_queue_t **queue, thread_type *thread_id)
Jack Moffitt's avatar
Jack Moffitt committed
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
{
	/* create item */
	thread_queue_t *item = (thread_queue_t *)malloc(sizeof(thread_queue_t));
	item->thread_id = thread_id;
	item->next = NULL;


	thread_mutex_lock(&_queue_mutex);
	if (*queue == NULL) {
		*queue = item;
	} else {
		item->next = *queue;
		*queue = item;
	}
	thread_mutex_unlock(&_queue_mutex);
}

256
static thread_type *_pop_thread(thread_queue_t **queue)
Jack Moffitt's avatar
Jack Moffitt committed
257
{
258
	thread_type *id;
Jack Moffitt's avatar
Jack Moffitt committed
259
260
261
262
263
264
265
	thread_queue_t *item;

	thread_mutex_lock(&_queue_mutex);

	item = *queue;
	if (item == NULL) {
		thread_mutex_unlock(&_queue_mutex);
Michael Smith's avatar
Michael Smith committed
266
		return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
	}

	*queue = item->next;
	item->next = NULL;
	id = item->thread_id;
	free(item);

	thread_mutex_unlock(&_queue_mutex);

	return id;
}

static void _build_pool(void)
{
	ice_config_t *config;
Michael Smith's avatar
Michael Smith committed
282
	int i;
283
    thread_type *tid;
Jack Moffitt's avatar
Jack Moffitt committed
284
	char buff[64];
Michael Smith's avatar
Michael Smith committed
285
    int threadpool_size;
Jack Moffitt's avatar
Jack Moffitt committed
286
287

	config = config_get_config();
Michael Smith's avatar
Michael Smith committed
288
289
    threadpool_size = config->threadpool_size;
    config_release_config();
Jack Moffitt's avatar
Jack Moffitt committed
290

Michael Smith's avatar
Michael Smith committed
291
	for (i = 0; i < threadpool_size; i++) {
Jack Moffitt's avatar
Jack Moffitt committed
292
		snprintf(buff, 64, "Connection Thread #%d", i);
293
		tid = thread_create(buff, _handle_connection, NULL, THREAD_ATTACHED);
Jack Moffitt's avatar
Jack Moffitt committed
294
295
296
297
298
299
		_push_thread(&_conhands, tid);
	}
}

static void _destroy_pool(void)
{
300
	thread_type *id;
Jack Moffitt's avatar
Jack Moffitt committed
301
302
303
304
305
306
	int i;

	i = 0;

	thread_cond_broadcast(&_pool_cond);
	id = _pop_thread(&_conhands);
Michael Smith's avatar
Michael Smith committed
307
	while (id != NULL) {
Jack Moffitt's avatar
Jack Moffitt committed
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
		thread_join(id);
		_signal_pool();
		id = _pop_thread(&_conhands);
	}
}

void connection_accept_loop(void)
{
	connection_t *con;

	_build_pool();

	while (global.running == ICE_RUNNING) {
		con = _accept_connection();

		if (con) {
			_add_connection(con);
			_signal_pool();
		}
	}

329
330
331
    /* Give all the other threads notification to shut down */
    thread_cond_broadcast(&global.shutdown_cond);

Jack Moffitt's avatar
Jack Moffitt committed
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
	_destroy_pool();

	/* wait for all the sources to shutdown */
	thread_rwlock_wlock(&_source_shutdown_rwlock);
	thread_rwlock_unlock(&_source_shutdown_rwlock);
}

static connection_t *_get_connection(void)
{
	con_queue_t *node = NULL;
	con_queue_t *oldnode = NULL;
	connection_t *con = NULL;

	thread_mutex_lock(&_queue_mutex);
	if (_queue) {
		node = _queue;
		while (node->next) {
			oldnode = node;
			node = node->next;
		}
		
		/* node is now the last node
		** and oldnode is the previous one, or NULL
		*/
		if (oldnode) oldnode->next = NULL;
		else (_queue) = NULL;
	}
	thread_mutex_unlock(&_queue_mutex);

	if (node) {
		con = node->con;
		free(node);
	}

	return con;
}

Michael Smith's avatar
Michael Smith committed
369
370
371
372
373
374
375
376
377
378
void connection_inject_event(int eventnum, void *event_data) {
    connection_t *con = calloc(1, sizeof(connection_t));

    con->event_number = eventnum;
    con->event = event_data;

    _add_connection(con);
    _signal_pool();
}

379
380
381
/* TODO: Make this return an appropriate error code so that we can use HTTP
 * codes where appropriate
 */
382
int connection_create_source(client_t *client, connection_t *con, http_parser_t *parser, char *mount) {
383
384
	source_t *source;
	char *contenttype;
Michael Smith's avatar
Michael Smith committed
385
    mount_proxy *mountproxy, *mountinfo = NULL;
Michael Smith's avatar
Michael Smith committed
386
387
388
389
390
391
    int source_limit;
    ice_config_t *config;

    config = config_get_config();
    source_limit = config->source_limit;
    config_release_config();
392

393
394
395
396
	/* check to make sure this source wouldn't
	** be over the limit
	*/
	global_lock();
Michael Smith's avatar
Michael Smith committed
397
	if (global.sources >= source_limit) {
398
399
400
401
402
403
		INFO1("Source (%s) logged in, but there are too many sources", mount);
		global_unlock();
		return 0;
	}
	global.sources++;
	global_unlock();
404

405
	stats_event_inc(NULL, "sources");
Michael Smith's avatar
Michael Smith committed
406
    
Michael Smith's avatar
Michael Smith committed
407
408
409
410
411
    config = config_get_config();
    mountproxy = config->mounts;
    thread_mutex_lock(&(config_locks()->mounts_lock));
    config_release_config();

Michael Smith's avatar
Michael Smith committed
412
413
414
415
416
417
418
    while(mountproxy) {
        if(!strcmp(mountproxy->mountname, mount)) {
            mountinfo = mountproxy;
            break;
        }
        mountproxy = mountproxy->next;
    }
419
420
421
422
423

	contenttype = httpp_getvar(parser, "content-type");

	if (contenttype != NULL) {
		format_type_t format = format_get_type(contenttype);
424
		if (format == FORMAT_ERROR) {
425
			WARN1("Content-type \"%s\" not supported, dropping source", contenttype);
Michael Smith's avatar
Michael Smith committed
426
            thread_mutex_unlock(&(config_locks()->mounts_lock));
427
            goto fail;
428
		} else {
Michael Smith's avatar
Michael Smith committed
429
430
			source = source_create(client, con, parser, mount, 
                    format, mountinfo);
Michael Smith's avatar
Michael Smith committed
431
            thread_mutex_unlock(&(config_locks()->mounts_lock));
432
433
		}
	} else {
434
        format_type_t format = FORMAT_TYPE_MP3;
435
		ERROR0("No content-type header, falling back to backwards compatibility mode for icecast 1.x relays. Assuming content is mp3.");
Michael Smith's avatar
Michael Smith committed
436
        source = source_create(client, con, parser, mount, format, mountinfo);
Michael Smith's avatar
Michael Smith committed
437
        thread_mutex_unlock(&(config_locks()->mounts_lock));
438
	}
439

440
    source->send_return = 1;
441
442
443
444
	source->shutdown_rwlock = &_source_shutdown_rwlock;
	sock_set_blocking(con->sock, SOCK_NONBLOCK);
	thread_create("Source Thread", source_main, (void *)source, THREAD_DETACHED);
	return 1;
445
446
447
448
449
450
451
452

fail:
    global_lock();
    global.sources--;
    global_unlock();

    stats_event_dec(NULL, "sources");
    return 0;
453
454
}

455
456
static int _check_pass_http(http_parser_t *parser, 
        char *correctuser, char *correctpass)
457
458
459
460
461
462
463
464
465
466
467
468
469
{
    /* This will look something like "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==" */
    char *header = httpp_getvar(parser, "authorization");
    char *userpass, *tmp;
    char *username, *password;

    if(header == NULL)
        return 0;

    if(strncmp(header, "Basic ", 6))
        return 0;

    userpass = util_base64_decode(header+6);
470
471
472
    if(userpass == NULL) {
        WARN1("Base64 decode of Authorization header \"%s\" failed",
                header+6);
473
        return 0;
474
    }
475
476
477
478
479
480
481
482
483
484

    tmp = strchr(userpass, ':');
    if(!tmp) {
        free(userpass);
        return 0;
    }
    *tmp = 0;
    username = userpass;
    password = tmp+1;

485
    if(strcmp(username, correctuser) || strcmp(password, correctpass)) {
486
487
488
        free(userpass);
        return 0;
    }
489
    free(userpass);
490
491
492
493

    return 1;
}

494
495
496
497
498
499
500
501
502
503
504
505
506
507
static int _check_pass_icy(http_parser_t *parser, char *correctpass)
{
    char *password;

    password = httpp_getvar(parser, HTTPP_VAR_ICYPASSWORD);
    if(!password)
        return 0;

	if (strcmp(password, correctpass))
        return 0;
    else
        return 1;
}

508
static int _check_pass_ice(http_parser_t *parser, char *correctpass)
509
{
510
    char *password;
511
512
513
514
515
516
517
518
519
520
521

    password = httpp_getvar(parser, "ice-password");
    if(!password)
        password = "";

	if (strcmp(password, correctpass))
        return 0;
    else
        return 1;
}

522
int connection_check_admin_pass(http_parser_t *parser)
523
{
Michael Smith's avatar
Michael Smith committed
524
525
526
527
528
    ice_config_t *config = config_get_config();
    char *pass = config->admin_password;
    char *user = config->admin_username;
    config_release_config();

529
530
531
    if(!pass || !user)
        return 0;

532
    return _check_pass_http(parser, user, pass);
533
534
}

535
int connection_check_source_pass(http_parser_t *parser, char *mount)
536
{
Michael Smith's avatar
Michael Smith committed
537
538
    ice_config_t *config = config_get_config();
    char *pass = config->source_password;
539
    char *user = "source";
540
    int ret;
Michael Smith's avatar
Michael Smith committed
541
    int ice_login = config->ice_login;
542
    char *protocol;
Michael Smith's avatar
Michael Smith committed
543
544
545
546

    mount_proxy *mountinfo = config->mounts;
    thread_mutex_lock(&(config_locks()->mounts_lock));
    config_release_config();
547

548
549
    while(mountinfo) {
        if(!strcmp(mountinfo->mountname, mount)) {
550
551
552
553
            if(mountinfo->password)
                pass = mountinfo->password;
            if(mountinfo->username)
                user = mountinfo->username;
554
555
            break;
        }
556
        mountinfo = mountinfo->next;
557
558
    }

Michael Smith's avatar
Michael Smith committed
559
560
    thread_mutex_unlock(&(config_locks()->mounts_lock));

561
562
563
564
565
    if(!pass) {
        WARN0("No source password set, rejecting source");
        return 0;
    }

566
    protocol = httpp_getvar(parser, HTTPP_VAR_PROTOCOL);
567
568
569
570
571
572
573
574
575
576
577
    if(protocol != NULL && !strcmp(protocol, "ICY")) {
        ret = _check_pass_icy(parser, pass);
    }
    else {
        ret = _check_pass_http(parser, user, pass);
        if(!ret && ice_login)
        {
            ret = _check_pass_ice(parser, pass);
            if(ret)
                WARN0("Source is using deprecated icecast login");
        }
578
579
    }
    return ret;
580
581
}

582
583
584
static void _handle_source_request(connection_t *con, 
        http_parser_t *parser, char *uri)
{
585
586
587
588
    client_t *client;

	client = client_create(con, parser);

589
590
591
    INFO1("Source logging in at mountpoint \"%s\"", uri);
    stats_event_inc(NULL, "source_connections");
				
592
	if (!connection_check_source_pass(parser, uri)) {
593
		INFO1("Source (%s) attempted to login with invalid or missing password", uri);
594
        client_send_401(client);
595
596
597
598
599
600
601
602
603
        return;
	}

	/* check to make sure this source has
	** a unique mountpoint
	*/

	avl_tree_rlock(global.source_tree);
	if (source_find_mount(uri) != NULL) {
604
		avl_tree_unlock(global.source_tree);
605
		INFO1("Source tried to log in as %s, but mountpoint is already used", uri);
606
        client_send_404(client, "Mountpoint in use");
607
608
609
610
		return;
	}
	avl_tree_unlock(global.source_tree);

611
	if (!connection_create_source(client, con, parser, uri)) {
612
        client_send_404(client, "Mountpoint in use");
613
614
615
616
617
	}
}

static void _handle_stats_request(connection_t *con, 
        http_parser_t *parser, char *uri)
Jack Moffitt's avatar
Jack Moffitt committed
618
619
{
	stats_connection_t *stats;
620
621
622

	stats_event_inc(NULL, "stats_connections");
				
623
	if (!connection_check_admin_pass(parser)) {
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
        ERROR0("Bad password for stats connection");
		connection_close(con);
		httpp_destroy(parser);
        return;
	}
					
	stats_event_inc(NULL, "stats");
					
	/* create stats connection and create stats handler thread */
	stats = (stats_connection_t *)malloc(sizeof(stats_connection_t));
	stats->parser = parser;
	stats->con = con;
					
	thread_create("Stats Connection", stats_connection, (void *)stats, THREAD_DETACHED);
}

static void _handle_get_request(connection_t *con,
        http_parser_t *parser, char *uri)
{
    char *fullpath;
	client_t *client;
    int bytes;
646
	struct stat statbuf;
647
	source_t *source;
Michael Smith's avatar
Michael Smith committed
648
649
650
651
652
653
654
655
656
657
658
659
660
    int fileserve;
    char *host;
    int port;
    ice_config_t *config;
    int client_limit;

    config = config_get_config();
    fileserve = config->fileserve;
    host = config->hostname;
    port = config->port;
    client_limit = config->client_limit;
    config_release_config();

661
662
663
664
665
666
667
668
669

    DEBUG0("Client connected");

	/* make a client */
	client = client_create(con, parser);
	stats_event_inc(NULL, "client_connections");
					
	/* there are several types of HTTP GET clients
	** media clients, which are looking for a source (eg, URI = /stream.ogg)
670
	** stats clients, which are looking for /admin/stats.xml
671
672
673
674
675
676
	** and director server authorizers, which are looking for /GUID-xxxxxxxx 
    ** (where xxxxxx is the GUID in question) - this isn't implemented yet.
	** we need to handle the latter two before the former, as the latter two
	** aren't subject to the limits.
	*/
	/* TODO: add GUID-xxxxxx */
677

678
679
680
    /* Dispatch all admin requests */
	if (strncmp(uri, "/admin/", 7) == 0) {
        admin_handle_request(client, uri);
Michael Smith's avatar
Michael Smith committed
681
682
683
        return;
    }

684
685
686
687
688
689
690
691
692
	/* Here we are parsing the URI request to see
	** if the extension is .xsl, if so, then process
	** this request as an XSLT request
	*/
    fullpath = util_get_path_from_normalised_uri(uri);
    if (util_check_valid_extension(fullpath) == XSLT_CONTENT) {
    	/* If the file exists, then transform it, otherwise, write a 404 */
    	if (stat(fullpath, &statbuf) == 0) {
            DEBUG0("Stats request, sending XSL transformed stats");
693
            client->respcode = 200;
694
695
696
697
698
699
700
701
702
703
704
705
		    bytes = sock_write(client->con->sock, 
                    "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\n");
            if(bytes > 0) client->con->sent_bytes = bytes;
            stats_transform_xslt(client, fullpath);
		    client_destroy(client);
    	}
	    else {
            client_send_404(client, "The file you requested could not be found");
    	}
        free(fullpath);
        return;
	}
Michael Smith's avatar
Michael Smith committed
706
707
    else if(fileserve && stat(fullpath, &statbuf) == 0) 
    {
708
        fserve_client_create(client, fullpath);
709
710
711
        free(fullpath);
        return;
    }
712
713
714
715
716
717
718
719
720
721
722
723
724
725
    free(fullpath);

    if(strcmp(util_get_extension(uri), "m3u") == 0) {
        char *sourceuri = strdup(uri);
        char *dot = strrchr(sourceuri, '.');
        *dot = 0;
	    avl_tree_rlock(global.source_tree);
    	source = source_find_mount(sourceuri);
	   	if (source) {
            client->respcode = 200;
            bytes = sock_write(client->con->sock,
                    "HTTP/1.0 200 OK\r\n"
                    "Content-Type: audio/x-mpegurl\r\n\r\n"
                    "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
726
727
                    host, 
                    port,
728
729
730
731
732
                    sourceuri
                    );
            if(bytes > 0) client->con->sent_bytes = bytes;
    	    client_destroy(client);
        }
Michael Smith's avatar
Michael Smith committed
733
        else if(fileserve) {
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
            fullpath = util_get_path_from_normalised_uri(sourceuri);
            if(stat(fullpath, &statbuf) == 0) {
                fserve_client_create(client, fullpath);
                free(fullpath);
            }
            else {
                free(fullpath);
                fullpath = util_get_path_from_normalised_uri(uri);
                if(stat(fullpath, &statbuf) == 0) {
                    fserve_client_create(client, fullpath);
                    free(fullpath);
                }
                else {
                    free(fullpath);
                    client_send_404(client, 
                            "The file you requested could not be found");
                }
            }
        }
753
754
755
756
757
758
759
760
761
        else {
            client_send_404(client, "The file you requested could not be found");
        }
		avl_tree_unlock(global.source_tree);
        free(sourceuri);
        return;
    }

	global_lock();
Michael Smith's avatar
Michael Smith committed
762
	if (global.clients >= client_limit) {
Michael Smith's avatar
Michael Smith committed
763
764
        client_send_504(client,
                "The server is already full. Try again later.");
765
766
767
768
769
770
771
772
773
774
775
		global_unlock();
        return;
	}
	global_unlock();
					
	avl_tree_rlock(global.source_tree);
	source = source_find_mount(uri);
	if (source) {
        DEBUG0("Source found for client");
						
		global_lock();
Michael Smith's avatar
Michael Smith committed
776
		if (global.clients >= client_limit) {
Michael Smith's avatar
Michael Smith committed
777
778
            client_send_504(client, 
                    "The server is already full. Try again later.");
779
780
781
782
			global_unlock();
            avl_tree_unlock(global.source_tree);
            return;
		}
Michael Smith's avatar
Michael Smith committed
783
784
785
786
787
788
789
790
791
        else if(source->max_listeners != -1 && 
                source->listeners >= source->max_listeners) 
        {
            client_send_504(client, 
                    "Too many clients on this mountpoint. Try again later.");
			global_unlock();
            avl_tree_unlock(global.source_tree);
            return;
        }
792
793
794
		global.clients++;
		global_unlock();
						
Michael Smith's avatar
Michael Smith committed
795
796
797
798
        client->format_data = source->format->create_client_data(
                source->format, source, client);

        source->format->client_send_headers(source->format, source, client);
799
800
801
802
803
						
		bytes = sock_write(client->con->sock, "\r\n");
        if(bytes > 0) client->con->sent_bytes += bytes;
							
    	sock_set_blocking(client->con->sock, SOCK_NONBLOCK);
804
        sock_set_nodelay(client->con->sock);
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
						
		avl_tree_wlock(source->pending_tree);
		avl_insert(source->pending_tree, (void *)client);
		avl_tree_unlock(source->pending_tree);
	}
					
	avl_tree_unlock(global.source_tree);
					
	if (!source) {
        DEBUG0("Source not found for client");
        client_send_404(client, "The source you requested could not be found.");
	}
}

static void *_handle_connection(void *arg)
{
	char header[4096];
	connection_t *con;
	http_parser_t *parser;
824
    char *rawuri, *uri;
825
	client_t *client;
Jack Moffitt's avatar
Jack Moffitt committed
826
827
828
829
830
831
832
833

	while (global.running == ICE_RUNNING) {
		memset(header, 0, 4096);

		thread_cond_wait(&_pool_cond);
		if (global.running != ICE_RUNNING) break;

		/* grab a connection and set the socket to blocking */
834
		while ((con = _get_connection())) {
Michael Smith's avatar
Michael Smith committed
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849

            /* Handle meta-connections */
            if(con->event_number > 0) {
                switch(con->event_number) {
                    case EVENT_CONFIG_READ:
                        event_config_read(con->event);
                        break;
                    default:
                        ERROR1("Unknown event number: %d", con->event_number);
                        break;
                }
                free(con);
                continue;
            }

850
			stats_event_inc(NULL, "connections");
Jack Moffitt's avatar
Jack Moffitt committed
851

852
			sock_set_blocking(con->sock, SOCK_BLOCK);
Jack Moffitt's avatar
Jack Moffitt committed
853

854
855
856
			/* fill header with the http header */
			if (util_read_header(con->sock, header, 4096) == 0) {
				/* either we didn't get a complete header, or we timed out */
Jack Moffitt's avatar
Jack Moffitt committed
857
858
859
860
				connection_close(con);
				continue;
			}

861
862
863
864
			parser = httpp_create_parser();
			httpp_initialize(parser, NULL);
			if (httpp_parse(parser, header, strlen(header))) {
				/* handle the connection or something */
Jack Moffitt's avatar
Jack Moffitt committed
865
				
866
867
				if (strcmp("ICE",  httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) &&
                    strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) {
868
                    ERROR0("Bad HTTP protocol detected");
Jack Moffitt's avatar
Jack Moffitt committed
869
870
871
872
873
					connection_close(con);
					httpp_destroy(parser);
					continue;
				}

874
875
876
877
878
                rawuri = httpp_getvar(parser, HTTPP_VAR_URI);
                uri = util_normalise_uri(rawuri);

                if(!uri) {
					client = client_create(con, parser);
879
                    client_send_404(client, "The path you requested was invalid");
880
881
                    continue;
                }
882

883
				if (parser->req_type == httpp_req_source) {
884
885
886
887
888
889
890
891
892
                    _handle_source_request(con, parser, uri);
                }
                else if (parser->req_type == httpp_req_stats) {
                    _handle_stats_request(con, parser, uri);
                }
                else if (parser->req_type == httpp_req_get) {
                    _handle_get_request(con, parser, uri);
                }
                else {
893
                    ERROR0("Wrong request type from client");
894
895
					connection_close(con);
					httpp_destroy(parser);
896
897
898
                }

                free(uri);
899
900
901
			} 
            else if(httpp_parse_icy(parser, header, strlen(header))) {
                /* TODO: Map incoming icy connections to /icy_0, etc. */
902
903
904
905
906
907
908
909
910
911
912
913
                char mount[20];
                int i = 0;

                strcpy(mount, "/");

                avl_tree_rlock(global.source_tree);
                while(source_find_mount(mount) != NULL) {
                    sprintf(mount, "/icy_%d", i++);
                }
                avl_tree_unlock(global.source_tree);

                _handle_source_request(con, parser, mount);
914
915
            }
            else {
916
                ERROR0("HTTP request parsing failed");
Jack Moffitt's avatar
Jack Moffitt committed
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
				connection_close(con);
				httpp_destroy(parser);
				continue;
			}
		}
	}

	thread_exit(0);

	return NULL;
}

void connection_close(connection_t *con)
{
	sock_close(con->sock);
	if (con->ip) free(con->ip);
	if (con->host) free(con->host);
	free(con);
}