source.c 17.5 KB
Newer Older
Jack Moffitt's avatar
Jack Moffitt committed
1 2 3 4
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
5 6 7
#include <ogg/ogg.h>

#ifndef _WIN32
8
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
9 10
#include <sys/time.h>
#include <sys/socket.h>
11
#else
12 13
#include <winsock2.h>
#include <windows.h>
14
#endif
Jack Moffitt's avatar
Jack Moffitt committed
15 16 17 18 19 20 21 22 23 24 25

#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
26
#include "log.h"
27
#include "logging.h"
28
#include "config.h"
29
#include "util.h"
30
#include "geturl.h"
Jack Moffitt's avatar
Jack Moffitt committed
31
#include "source.h"
32
#include "format.h"
Jack Moffitt's avatar
Jack Moffitt committed
33

34 35 36
#undef CATMODULE
#define CATMODULE "source"

Jack Moffitt's avatar
Jack Moffitt committed
37 38 39 40 41
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _remove_client(void *key);
static int _free_client(void *key);

42
source_t *source_create(client_t *client, connection_t *con, http_parser_t *parser, const char *mount, format_type_t type)
Jack Moffitt's avatar
Jack Moffitt committed
43
{
44
	int	i = 0;
Jack Moffitt's avatar
Jack Moffitt committed
45 46 47
	source_t *src;

	src = (source_t *)malloc(sizeof(source_t));
48
    src->client = client;
Jack Moffitt's avatar
Jack Moffitt committed
49
	src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
50
    src->fallback_mount = NULL;
51
	src->format = format_get_plugin(type, src->mount, parser);
Jack Moffitt's avatar
Jack Moffitt committed
52 53 54 55
	src->con = con;
	src->parser = parser;
	src->client_tree = avl_tree_new(_compare_clients, NULL);
	src->pending_tree = avl_tree_new(_compare_clients, NULL);
56
    src->running = 1;
57 58 59 60
	src->num_yp_directories = 0;
	src->listeners = 0;
	for (i=0;i<config_get_config()->num_yp_directories;i++) {
		if (config_get_config()->yp_url[i]) {
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
61
			src->ypdata[src->num_yp_directories] = yp_create_ypdata();
62 63 64 65 66
			src->ypdata[src->num_yp_directories]->yp_url = 
                config_get_config()->yp_url[i];
			src->ypdata[src->num_yp_directories]->yp_url_timeout = 
                config_get_config()->yp_url_timeout[i];
			src->ypdata[src->num_yp_directories]->yp_touch_interval = 0;
67 68 69
			src->num_yp_directories++;
		}
	}
Jack Moffitt's avatar
Jack Moffitt committed
70 71 72 73 74 75 76 77 78 79 80 81 82

	return src;
}

/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
	source_t *source;
	avl_node *node;
	int cmp;

83 84 85
	if (!mount) {
		return NULL;
	}
Jack Moffitt's avatar
Jack Moffitt committed
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
	/* get the root node */
	node = global.source_tree->root->right;
	
	while (node) {
		source = (source_t *)node->key;
		cmp = strcmp(mount, source->mount);
		if (cmp < 0) 
			node = node->left;
		else if (cmp > 0)
			node = node->right;
		else
			return source;
	}
	
	/* didn't find it */
	return NULL;
}

int source_compare_sources(void *arg, void *a, void *b)
{
	source_t *srca = (source_t *)a;
	source_t *srcb = (source_t *)b;

	return strcmp(srca->mount, srcb->mount);
}

int source_free_source(void *key)
{
	source_t *source = (source_t *)key;
115
	int i=0;
Jack Moffitt's avatar
Jack Moffitt committed
116 117

	free(source->mount);
Michael Smith's avatar
Michael Smith committed
118
    free(source->fallback_mount);
119
    client_destroy(source->client);
Jack Moffitt's avatar
Jack Moffitt committed
120 121
	avl_tree_free(source->pending_tree, _free_client);
	avl_tree_free(source->client_tree, _free_client);
122
	source->format->free_plugin(source->format);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
123 124 125
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
Jack Moffitt's avatar
Jack Moffitt committed
126 127 128 129 130 131
	free(source);

	return 1;
}
	

132 133 134
/* The caller MUST have a current write lock on global.source_tree when calling
 * this
 */
Jack Moffitt's avatar
Jack Moffitt committed
135 136 137
void *source_main(void *arg)
{
	source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
138
    source_t *fallback_source;
Jack Moffitt's avatar
Jack Moffitt committed
139 140
	char buffer[4096];
	long bytes, sbytes;
141
	int ret, timeout;
Jack Moffitt's avatar
Jack Moffitt committed
142 143
	client_t *client;
	avl_node *client_node;
Jack Moffitt's avatar
Jack Moffitt committed
144
	char *s;
145 146
	long current_time;
	char current_song[256];
Jack Moffitt's avatar
Jack Moffitt committed
147 148 149 150 151

	refbuf_t *refbuf, *abuf;
	int data_done;

	int listeners = 0;
152
	int	i=0;
153
	int	suppress_yp = 0;
Jack Moffitt's avatar
Jack Moffitt committed
154

155 156
	timeout = config_get_config()->source_timeout;

Jack Moffitt's avatar
Jack Moffitt committed
157 158 159
	/* grab a read lock, to make sure we get a chance to cleanup */
	thread_rwlock_rlock(source->shutdown_rwlock);

160 161
    /* The caller has ensured we have a write lock on the tree... */

Jack Moffitt's avatar
Jack Moffitt committed
162 163 164 165 166 167 168 169
	/* insert source onto source tree */
	avl_insert(global.source_tree, (void *)source);
	/* release write lock on global source tree */
	avl_tree_unlock(global.source_tree);


	/* start off the statistics */
	stats_event(source->mount, "listeners", "0");
170 171 172 173 174 175
	source->listeners = 0;
	if ((s = httpp_getvar(source->parser, "ice-name"))) {
		for (i=0;i<source->num_yp_directories;i++) {
			if (source->ypdata[i]->server_name) {
				free(source->ypdata[i]->server_name);
			}
176
			source->ypdata[i]->server_name = malloc(strlen(s) +1);
177 178
			strcpy(source->ypdata[i]->server_name, s);
		}
Jack Moffitt's avatar
Jack Moffitt committed
179
		stats_event(source->mount, "name", s);
180 181 182 183 184 185
	}
	if ((s = httpp_getvar(source->parser, "ice-url"))) {
		for (i=0;i<source->num_yp_directories;i++) {
			if (source->ypdata[i]->server_url) {
				free(source->ypdata[i]->server_url);
			}
186
			source->ypdata[i]->server_url = malloc(strlen(s) +1);
187 188
			strcpy(source->ypdata[i]->server_url, s);
		}
Jack Moffitt's avatar
Jack Moffitt committed
189
		stats_event(source->mount, "url", s);
190 191 192 193 194 195
	}
	if ((s = httpp_getvar(source->parser, "ice-genre"))) {
		for (i=0;i<source->num_yp_directories;i++) {
			if (source->ypdata[i]->server_genre) {
				free(source->ypdata[i]->server_genre);
			}
196
			source->ypdata[i]->server_genre = malloc(strlen(s) +1);
197 198
			strcpy(source->ypdata[i]->server_genre, s);
		}
199
		stats_event(source->mount, "genre", s);
200 201 202 203 204 205
	}
	if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
		for (i=0;i<source->num_yp_directories;i++) {
			if (source->ypdata[i]->bitrate) {
				free(source->ypdata[i]->bitrate);
			}
206
			source->ypdata[i]->bitrate = malloc(strlen(s) +1);
207 208
			strcpy(source->ypdata[i]->bitrate, s);
		}
Jack Moffitt's avatar
Jack Moffitt committed
209
		stats_event(source->mount, "bitrate", s);
210 211 212 213 214 215
	}
	if ((s = httpp_getvar(source->parser, "ice-description"))) {
		for (i=0;i<source->num_yp_directories;i++) {
			if (source->ypdata[i]->server_desc) {
				free(source->ypdata[i]->server_desc);
			}
216
			source->ypdata[i]->server_desc = malloc(strlen(s) +1);
217 218
			strcpy(source->ypdata[i]->server_desc, s);
		}
Jack Moffitt's avatar
Jack Moffitt committed
219
		stats_event(source->mount, "description", s);
220
	}
221
	if ((s = httpp_getvar(source->parser, "ice-private"))) {
222
		stats_event(source->mount, "public", s);
223
		suppress_yp = atoi(s);
224 225 226 227 228
	}
	for (i=0;i<source->num_yp_directories;i++) {
		if (source->ypdata[i]->server_type) {
			free(source->ypdata[i]->server_type);
		}
229 230 231 232
		source->ypdata[i]->server_type = malloc(
                strlen(source->format->format_description) + 1);
		strcpy(source->ypdata[i]->server_type, 
                source->format->format_description);
233
	}
234
    stats_event(source->mount, "type", source->format->format_description);
Jack Moffitt's avatar
Jack Moffitt committed
235

236
	for (i=0;i<source->num_yp_directories;i++) {
237
        int listen_url_size;
238 239 240
		if (source->ypdata[i]->listen_url) {
			free(source->ypdata[i]->listen_url);
		}
241 242 243 244 245 246 247 248
		/* 6 for max size of port */
		listen_url_size = strlen("http://") + 
            strlen(config_get_config()->hostname) + 
            strlen(":") + 6 + strlen(source->mount) + 1;
		source->ypdata[i]->listen_url = malloc(listen_url_size);
		sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
                config_get_config()->hostname, config_get_config()->port, 
                source->mount);
249 250
	}

251 252 253 254
	if(!suppress_yp) {
        yp_add(source, YP_ADD_ALL);

    	current_time = time(NULL);
255

256 257 258 259 260 261 262 263 264 265 266 267
	    for (i=0;i<source->num_yp_directories;i++) {
		    source->ypdata[i]->yp_last_touch = current_time;
            /* Don't permit touch intervals of less than 30 seconds */
	    	if (source->ypdata[i]->yp_touch_interval <= 30) {
		    	source->ypdata[i]->yp_touch_interval = 30;
    		}
	    }
    }

	while (global.running == ICE_RUNNING && source->running) {
		if(!suppress_yp) {
            current_time = time(NULL);
268
			for (i=0;i<source->num_yp_directories;i++) {
269 270 271
				if (current_time > (source->ypdata[i]->yp_last_touch + 
                            source->ypdata[i]->yp_touch_interval)) {
                    current_song[0] = 0;
272
					if (stats_get_value(source->mount, "artist")) {
273 274 275
						strncat(current_song, 
                                stats_get_value(source->mount, "artist"), 
                                sizeof(current_song) - 1);
276 277 278 279 280
						if (strlen(current_song) + 4 < sizeof(current_song)) {
							strncat(current_song, " - ", 3);
						}
					}
					if (stats_get_value(source->mount, "title")) {
281 282 283 284 285 286 287 288
						if (strlen(current_song) + 
                                strlen(stats_get_value(source->mount, "title"))
                                < sizeof(current_song) -1) 
                        {
							strncat(current_song, 
                                    stats_get_value(source->mount, "title"), 
                                    sizeof(current_song) - 1 - 
                                    strlen(current_song));
289 290
						}
					}
291
                    
292 293 294 295 296
					if (source->ypdata[i]->current_song) {
						free(source->ypdata[i]->current_song);
						source->ypdata[i]->current_song = NULL;
					}
	
297 298
					source->ypdata[i]->current_song = 
                        malloc(strlen(current_song) + 1);
299 300
					strcpy(source->ypdata[i]->current_song, current_song);
	
301 302
					thread_create("YP Touch Thread", yp_touch_thread, 
                            (void *)source, THREAD_DETACHED); 
303 304 305
				}
			}
		}
306

307
		ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
308 309 310 311
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
312
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
Jack Moffitt's avatar
Jack Moffitt committed
313 314 315
		while (refbuf == NULL) {
			bytes = 0;
			while (bytes <= 0) {
316 317 318
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

				if (ret <= 0) { /* timeout expired */
319 320
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
321 322 323
					bytes = 0;
					break;
				}
Jack Moffitt's avatar
Jack Moffitt committed
324 325

				bytes = sock_read_bytes(source->con->sock, buffer, 4096);
326 327 328
				if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) {
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
329
                    break;
330
                }
Jack Moffitt's avatar
Jack Moffitt committed
331 332
			}
			if (bytes <= 0) break;
333
            source->client->con->sent_bytes += bytes;
Michael Smith's avatar
Michael Smith committed
334 335 336 337 338
			ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf);
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
Jack Moffitt's avatar
Jack Moffitt committed
339 340 341
		}

		if (bytes <= 0) {
342
			INFO0("Removing source following disconnection");
Jack Moffitt's avatar
Jack Moffitt committed
343 344 345 346 347
			break;
		}

		/* we have a refbuf buffer, which a data block to be sent to 
		** all clients.  if a client is not able to send the buffer
Michael Smith's avatar
Michael Smith committed
348
		** immediately, it should store it on its queue for the next
Jack Moffitt's avatar
Jack Moffitt committed
349 350 351
		** go around.
		**
		** instead of sending the current block, a client should send
Michael Smith's avatar
Michael Smith committed
352
		** all data in the queue, plus the current block, until either
Jack Moffitt's avatar
Jack Moffitt committed
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
		** it runs out of data, or it hits a recoverable error like
		** EAGAIN.  this will allow a client that got slightly lagged
		** to catch back up if it can
		*/

		/* acquire read lock on client_tree */
		avl_tree_rlock(source->client_tree);

		client_node = avl_get_first(source->client_tree);
		while (client_node) {
			/* acquire read lock on node */
			avl_node_wlock(client_node);

			client = (client_t *)client_node->key;
			
			data_done = 0;

			/* do we have any old buffers? */
			abuf = refbuf_queue_remove(&client->queue);
			while (abuf) {
				if (client->pos > 0)
					bytes = abuf->len - client->pos;
				else
					bytes = abuf->len;

378 379
				sbytes = source->format->write_buf_to_client(source->format,
                        client, &abuf->data[client->pos], bytes);
380 381
				if (sbytes >= 0) {
                    if(sbytes != bytes) {
382 383 384
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
385 386 387 388 389 390
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
391 392 393
                else {
                    DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
                    client->con->error = 1;
Jack Moffitt's avatar
Jack Moffitt committed
394
					data_done = 1;
395
                    refbuf_release(abuf);
Jack Moffitt's avatar
Jack Moffitt committed
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
					break;
				}
				
				/* we're done with that refbuf, release it and reset the pos */
				refbuf_release(abuf);
				client->pos = 0;

				abuf = refbuf_queue_remove(&client->queue);
			}
			
			/* now send or queue the new data */
			if (data_done) {
				refbuf_addref(refbuf);
				refbuf_queue_add(&client->queue, refbuf);
			} else {
411 412
				sbytes = source->format->write_buf_to_client(source->format,
                        client, refbuf->data, refbuf->len);
413
				if (sbytes >= 0) {
414 415
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
416 417
                        client->pos = sbytes;
						refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
418
                        refbuf_queue_insert(&client->queue, refbuf);
419 420
                    }
                }
421 422 423
                else {
                    DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
                    client->con->error = 1;
Jack Moffitt's avatar
Jack Moffitt committed
424 425 426 427 428 429 430 431 432 433
				}
			}

			/* if the client is too slow, its queue will slowly build up.
			** we need to make sure the client is keeping up with the
			** data, so we'll kick any client who's queue gets to large.
			** the queue_limit might need to be tuned, but should work fine.
			** TODO: put queue_limit in a config file
			*/
			if (refbuf_queue_size(&client->queue) > 25) {
434
                DEBUG0("Client has fallen too far behind, removing");
Jack Moffitt's avatar
Jack Moffitt committed
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
				client->con->error = 1;
			}

			/* release read lock on node */
			avl_node_unlock(client_node);

			/* get the next node */
			client_node = avl_get_next(client_node);
		}
		/* release read lock on client_tree */
		avl_tree_unlock(source->client_tree);

		refbuf_release(refbuf);

		/* acquire write lock on client_tree */
		avl_tree_wlock(source->client_tree);

		/** delete bad clients **/
		client_node = avl_get_first(source->client_tree);
		while (client_node) {
			client = (client_t *)client_node->key;
			if (client->con->error) {
				client_node = avl_get_next(client_node);
				avl_delete(source->client_tree, (void *)client, _free_client);
				listeners--;
				stats_event_args(source->mount, "listeners", "%d", listeners);
461
				source->listeners = listeners;
462
                DEBUG0("Client removed");
Jack Moffitt's avatar
Jack Moffitt committed
463 464 465 466 467 468 469 470 471 472 473 474 475
				continue;
			}
			client_node = avl_get_next(client_node);
		}

		/* acquire write lock on pending_tree */
		avl_tree_wlock(source->pending_tree);

		/** add pending clients **/
		client_node = avl_get_first(source->pending_tree);
		while (client_node) {
			avl_insert(source->client_tree, client_node->key);
			listeners++;
476
            DEBUG0("Client added");
Jack Moffitt's avatar
Jack Moffitt committed
477 478 479
			stats_event_inc(NULL, "clients");
			stats_event_inc(source->mount, "connections");
			stats_event_args(source->mount, "listeners", "%d", listeners);
480
			source->listeners = listeners;
Jack Moffitt's avatar
Jack Moffitt committed
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504

			/* we have to send cached headers for some data formats
			** this is where we queue up the buffers to send
			*/
			if (source->format->has_predata) {
				client = (client_t *)client_node->key;
				client->queue = source->format->get_predata(source->format);
			}

			client_node = avl_get_next(client_node);
		}

		/** clear pending tree **/
		while (avl_get_first(source->pending_tree)) {
			avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, _remove_client);
		}

		/* release write lock on pending_tree */
		avl_tree_unlock(source->pending_tree);

		/* release write lock on client_tree */
		avl_tree_unlock(source->client_tree);
	}

Michael Smith's avatar
Michael Smith committed
505 506
done:

507
    DEBUG0("Source exiting");
508
	if(!suppress_yp) {
509 510
		yp_remove(source);
	}
Jack Moffitt's avatar
Jack Moffitt committed
511

Michael Smith's avatar
Michael Smith committed
512 513 514 515
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

Jack Moffitt's avatar
Jack Moffitt committed
516 517 518
	/* we need to empty the client and pending trees */
	avl_tree_wlock(source->pending_tree);
	while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
519 520 521 522 523
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
            avl_delete(source->pending_tree, client, _remove_client);

524
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
525 526 527 528 529 530 531
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
    		avl_delete(source->pending_tree, client, _free_client);
        }
Jack Moffitt's avatar
Jack Moffitt committed
532 533
	}
	avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
534

Jack Moffitt's avatar
Jack Moffitt committed
535 536
	avl_tree_wlock(source->client_tree);
	while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
537 538 539 540 541
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
            avl_delete(source->client_tree, client, _remove_client);

542
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
543 544 545 546 547 548 549
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
		    avl_delete(source->client_tree, client, _free_client);
        }
Jack Moffitt's avatar
Jack Moffitt committed
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
	}
	avl_tree_unlock(source->client_tree);

	/* delete this sources stats */
	stats_event_dec(NULL, "sources");
	stats_event(source->mount, "listeners", NULL);

	global_lock();
	global.sources--;
	global_unlock();

	/* release our hold on the lock so the main thread can continue cleaning up */
	thread_rwlock_unlock(source->shutdown_rwlock);

	avl_tree_wlock(global.source_tree);
	avl_delete(global.source_tree, source, source_free_source);
	avl_tree_unlock(global.source_tree);

	thread_exit(0);
      
	return NULL;
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
	connection_t *cona = (connection_t *)a;
576
    connection_t *conb = (connection_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591

	if (cona->id < conb->id) return -1;
	if (cona->id > conb->id) return 1;

	return 0;
}

static int _remove_client(void *key)
{
	return 1;
}

static int _free_client(void *key)
{
	client_t *client = (client_t *)key;
592 593 594 595 596

	global_lock();
	global.clients--;
	global_unlock();
	stats_event_dec(NULL, "clients");
Jack Moffitt's avatar
Jack Moffitt committed
597 598 599 600 601
	
	client_destroy(client);
	
	return 1;
}
602