source.c 24 KB
Newer Older
Jack Moffitt's avatar
Jack Moffitt committed
1 2 3 4
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
5
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
6
#include <errno.h>
7 8

#ifndef _WIN32
9
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
10 11
#include <sys/time.h>
#include <sys/socket.h>
12
#else
13 14
#include <winsock2.h>
#include <windows.h>
15
#endif
Jack Moffitt's avatar
Jack Moffitt committed
16 17 18 19 20 21 22 23 24 25 26

#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
27
#include "log.h"
28
#include "logging.h"
29
#include "config.h"
30
#include "util.h"
31
#include "geturl.h"
Jack Moffitt's avatar
Jack Moffitt committed
32
#include "source.h"
33
#include "format.h"
Jack Moffitt's avatar
Jack Moffitt committed
34

35 36 37
#undef CATMODULE
#define CATMODULE "source"

38 39 40 41 42 43 44 45 46 47 48 49
#define  YP_SERVER_NAME 1
#define  YP_SERVER_DESC 2
#define  YP_SERVER_GENRE 3
#define  YP_SERVER_URL 4
#define  YP_BITRATE 5
#define  YP_AUDIO_INFO 6
#define  YP_SERVER_TYPE 7
#define  YP_CURRENT_SONG 8
#define  YP_URL_TIMEOUT 9
#define  YP_TOUCH_INTERVAL 10
#define  YP_LAST_TOUCH 11

Jack Moffitt's avatar
Jack Moffitt committed
50 51 52 53
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _remove_client(void *key);
static int _free_client(void *key);
54 55 56
static int _parse_audio_info(source_t *source, char *s);
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type);
Jack Moffitt's avatar
Jack Moffitt committed
57

58
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
59 60
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
61 62 63 64
{
	source_t *src;

	src = (source_t *)malloc(sizeof(source_t));
65
    src->client = client;
Jack Moffitt's avatar
Jack Moffitt committed
66
	src->mount = (char *)strdup(mount);
67
    src->fallback_mount = NULL;
68
	src->format = format_get_plugin(type, src->mount, parser);
Jack Moffitt's avatar
Jack Moffitt committed
69 70 71 72
	src->con = con;
	src->parser = parser;
	src->client_tree = avl_tree_new(_compare_clients, NULL);
	src->pending_tree = avl_tree_new(_compare_clients, NULL);
73
    src->running = 1;
74 75
	src->num_yp_directories = 0;
	src->listeners = 0;
76
    src->max_listeners = -1;
77
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
78 79
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
80
    src->audio_info = util_dict_new();
Jack Moffitt's avatar
Jack Moffitt committed
81

Michael Smith's avatar
Michael Smith committed
82 83 84 85 86 87 88 89 90 91 92 93 94 95
    if(mountinfo != NULL) {
        src->fallback_mount = mountinfo->fallback_mount;
        src->max_listeners = mountinfo->max_listeners;
        src->dumpfilename = mountinfo->dumpfile;
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

Jack Moffitt's avatar
Jack Moffitt committed
96 97 98 99 100 101 102 103 104 105 106 107
	return src;
}

/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
	source_t *source;
	avl_node *node;
	int cmp;

108 109 110
	if (!mount) {
		return NULL;
	}
Jack Moffitt's avatar
Jack Moffitt committed
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
	/* get the root node */
	node = global.source_tree->root->right;
	
	while (node) {
		source = (source_t *)node->key;
		cmp = strcmp(mount, source->mount);
		if (cmp < 0) 
			node = node->left;
		else if (cmp > 0)
			node = node->right;
		else
			return source;
	}
	
	/* didn't find it */
	return NULL;
}

int source_compare_sources(void *arg, void *a, void *b)
{
	source_t *srca = (source_t *)a;
	source_t *srcb = (source_t *)b;

	return strcmp(srca->mount, srcb->mount);
}

int source_free_source(void *key)
{
	source_t *source = (source_t *)key;
140
	int i=0;
Jack Moffitt's avatar
Jack Moffitt committed
141 142

	free(source->mount);
Michael Smith's avatar
Michael Smith committed
143
    free(source->fallback_mount);
144
    client_destroy(source->client);
Jack Moffitt's avatar
Jack Moffitt committed
145 146
	avl_tree_free(source->pending_tree, _free_client);
	avl_tree_free(source->client_tree, _free_client);
147
	source->format->free_plugin(source->format);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
148 149 150
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
151
    util_dict_free(source->audio_info);
Jack Moffitt's avatar
Jack Moffitt committed
152 153 154 155 156 157
	free(source);

	return 1;
}
	

158 159 160
/* The caller MUST have a current write lock on global.source_tree when calling
 * this
 */
Jack Moffitt's avatar
Jack Moffitt committed
161 162 163
void *source_main(void *arg)
{
	source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
164
    source_t *fallback_source;
Jack Moffitt's avatar
Jack Moffitt committed
165 166
	char buffer[4096];
	long bytes, sbytes;
167
	int ret, timeout;
Jack Moffitt's avatar
Jack Moffitt committed
168 169
	client_t *client;
	avl_node *client_node;
Jack Moffitt's avatar
Jack Moffitt committed
170
	char *s;
171 172
	long current_time;
	char current_song[256];
Jack Moffitt's avatar
Jack Moffitt committed
173 174 175 176

	refbuf_t *refbuf, *abuf;
	int data_done;

177 178 179 180
    int listeners = 0;
    int	i=0;
    int	suppress_yp = 0;
    char *ai;
Jack Moffitt's avatar
Jack Moffitt committed
181

182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
    long queue_limit;
    ice_config_t *config;
    char *hostname;
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
	timeout = config->source_timeout;
    hostname = config->hostname;
    port = config->port;

	for (i=0;i<config->num_yp_directories;i++) {
		if (config->yp_url[i]) {
			source->ypdata[source->num_yp_directories] = yp_create_ypdata();
			source->ypdata[source->num_yp_directories]->yp_url = 
                config->yp_url[i];
			source->ypdata[source->num_yp_directories]->yp_url_timeout = 
                config->yp_url_timeout[i];
			source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
			source->num_yp_directories++;
		}
	}
205

206
    config_release_config();
207

Jack Moffitt's avatar
Jack Moffitt committed
208 209 210
	/* grab a read lock, to make sure we get a chance to cleanup */
	thread_rwlock_rlock(source->shutdown_rwlock);

211 212 213 214 215 216 217 218 219 220 221 222
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
    if (source_find_mount(source->mount) != NULL) {
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
        thread_exit(0);
        return NULL;
    }
Jack Moffitt's avatar
Jack Moffitt committed
223 224 225 226 227
	/* insert source onto source tree */
	avl_insert(global.source_tree, (void *)source);
	/* release write lock on global source tree */
	avl_tree_unlock(global.source_tree);

228 229 230 231 232 233 234 235 236
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
237 238 239

	/* start off the statistics */
	stats_event(source->mount, "listeners", "0");
240 241
	source->listeners = 0;
	if ((s = httpp_getvar(source->parser, "ice-name"))) {
242
        _add_yp_info(source, "server_name", s, YP_SERVER_NAME);
243 244
	}
	if ((s = httpp_getvar(source->parser, "ice-url"))) {
245
        _add_yp_info(source, "server_url", s, YP_SERVER_URL);
246 247
	}
	if ((s = httpp_getvar(source->parser, "ice-genre"))) {
248
        _add_yp_info(source, "genre", s, YP_SERVER_GENRE);
249 250
	}
	if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
251
        _add_yp_info(source, "bitrate", s, YP_BITRATE);
252 253
	}
	if ((s = httpp_getvar(source->parser, "ice-description"))) {
254
        _add_yp_info(source, "server_description", s, YP_SERVER_DESC);
255
	}
256
	if ((s = httpp_getvar(source->parser, "ice-private"))) {
257
		stats_event(source->mount, "public", s);
258
		suppress_yp = atoi(s);
259
	}
260 261 262
	if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
263
            _add_yp_info(source, "audio_info", 
264 265 266 267
                    ai,
                    YP_AUDIO_INFO);
        }
	}
268 269 270 271
	for (i=0;i<source->num_yp_directories;i++) {
		if (source->ypdata[i]->server_type) {
			free(source->ypdata[i]->server_type);
		}
272 273 274 275
		source->ypdata[i]->server_type = malloc(
                strlen(source->format->format_description) + 1);
		strcpy(source->ypdata[i]->server_type, 
                source->format->format_description);
276
	}
277
    stats_event(source->mount, "type", source->format->format_description);
Jack Moffitt's avatar
Jack Moffitt committed
278

279
	for (i=0;i<source->num_yp_directories;i++) {
280
        int listen_url_size;
281 282 283
		if (source->ypdata[i]->listen_url) {
			free(source->ypdata[i]->listen_url);
		}
284 285
		/* 6 for max size of port */
		listen_url_size = strlen("http://") + 
286
            strlen(hostname) + 
287 288 289
            strlen(":") + 6 + strlen(source->mount) + 1;
		source->ypdata[i]->listen_url = malloc(listen_url_size);
		sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
290
                hostname, port, source->mount);
291 292
	}

293 294 295 296
	if(!suppress_yp) {
        yp_add(source, YP_ADD_ALL);

    	current_time = time(NULL);
297

298 299 300
        _add_yp_info(source, "last_touch", (void *)current_time, 
            YP_LAST_TOUCH);

301
	    for (i=0;i<source->num_yp_directories;i++) {
302 303 304 305
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
            source->ypdata[i]->yp_last_touch = current_time - 
                source->ypdata[i]->yp_touch_interval + 5;
306 307 308 309 310 311 312
            /* Don't permit touch intervals of less than 30 seconds */
	    	if (source->ypdata[i]->yp_touch_interval <= 30) {
		    	source->ypdata[i]->yp_touch_interval = 30;
    		}
	    }
    }

313 314
    DEBUG0("Source creation complete");

315 316 317
	while (global.running == ICE_RUNNING && source->running) {
		if(!suppress_yp) {
            current_time = time(NULL);
318
			for (i=0;i<source->num_yp_directories;i++) {
319 320 321
				if (current_time > (source->ypdata[i]->yp_last_touch + 
                            source->ypdata[i]->yp_touch_interval)) {
                    current_song[0] = 0;
322
					if (stats_get_value(source->mount, "artist")) {
323 324 325
						strncat(current_song, 
                                stats_get_value(source->mount, "artist"), 
                                sizeof(current_song) - 1);
326 327 328 329 330
						if (strlen(current_song) + 4 < sizeof(current_song)) {
							strncat(current_song, " - ", 3);
						}
					}
					if (stats_get_value(source->mount, "title")) {
331 332 333 334 335 336 337 338
						if (strlen(current_song) + 
                                strlen(stats_get_value(source->mount, "title"))
                                < sizeof(current_song) -1) 
                        {
							strncat(current_song, 
                                    stats_get_value(source->mount, "title"), 
                                    sizeof(current_song) - 1 - 
                                    strlen(current_song));
339 340
						}
					}
341
                    
342 343 344 345 346
					if (source->ypdata[i]->current_song) {
						free(source->ypdata[i]->current_song);
						source->ypdata[i]->current_song = NULL;
					}
	
347 348
					source->ypdata[i]->current_song = 
                        malloc(strlen(current_song) + 1);
349 350
					strcpy(source->ypdata[i]->current_song, current_song);
	
351 352
					thread_create("YP Touch Thread", yp_touch_thread, 
                            (void *)source, THREAD_DETACHED); 
353 354 355
				}
			}
		}
356

357
		ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
358 359 360 361
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
362
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
Jack Moffitt's avatar
Jack Moffitt committed
363 364 365
		while (refbuf == NULL) {
			bytes = 0;
			while (bytes <= 0) {
366 367 368
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

				if (ret <= 0) { /* timeout expired */
369 370
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
371 372 373
					bytes = 0;
					break;
				}
Jack Moffitt's avatar
Jack Moffitt committed
374 375

				bytes = sock_read_bytes(source->con->sock, buffer, 4096);
376 377 378
				if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) {
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
379
                    break;
380
                }
Jack Moffitt's avatar
Jack Moffitt committed
381 382
			}
			if (bytes <= 0) break;
383
            source->client->con->sent_bytes += bytes;
Michael Smith's avatar
Michael Smith committed
384 385 386 387 388
			ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf);
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
Jack Moffitt's avatar
Jack Moffitt committed
389 390 391
		}

		if (bytes <= 0) {
392
			INFO0("Removing source following disconnection");
Jack Moffitt's avatar
Jack Moffitt committed
393 394 395 396 397
			break;
		}

		/* we have a refbuf buffer, which a data block to be sent to 
		** all clients.  if a client is not able to send the buffer
Michael Smith's avatar
Michael Smith committed
398
		** immediately, it should store it on its queue for the next
Jack Moffitt's avatar
Jack Moffitt committed
399 400 401
		** go around.
		**
		** instead of sending the current block, a client should send
Michael Smith's avatar
Michael Smith committed
402
		** all data in the queue, plus the current block, until either
Jack Moffitt's avatar
Jack Moffitt committed
403 404 405 406 407
		** it runs out of data, or it hits a recoverable error like
		** EAGAIN.  this will allow a client that got slightly lagged
		** to catch back up if it can
		*/

Michael Smith's avatar
Michael Smith committed
408 409 410 411 412 413 414 415 416 417 418 419
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

Jack Moffitt's avatar
Jack Moffitt committed
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
		/* acquire read lock on client_tree */
		avl_tree_rlock(source->client_tree);

		client_node = avl_get_first(source->client_tree);
		while (client_node) {
			/* acquire read lock on node */
			avl_node_wlock(client_node);

			client = (client_t *)client_node->key;
			
			data_done = 0;

			/* do we have any old buffers? */
			abuf = refbuf_queue_remove(&client->queue);
			while (abuf) {
				if (client->pos > 0)
					bytes = abuf->len - client->pos;
				else
					bytes = abuf->len;

440 441
				sbytes = source->format->write_buf_to_client(source->format,
                        client, &abuf->data[client->pos], bytes);
442 443
				if (sbytes >= 0) {
                    if(sbytes != bytes) {
444 445 446
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
447 448 449 450 451 452
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
453 454 455
                else {
                    DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
                    client->con->error = 1;
Jack Moffitt's avatar
Jack Moffitt committed
456
					data_done = 1;
457
                    refbuf_release(abuf);
Jack Moffitt's avatar
Jack Moffitt committed
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
					break;
				}
				
				/* we're done with that refbuf, release it and reset the pos */
				refbuf_release(abuf);
				client->pos = 0;

				abuf = refbuf_queue_remove(&client->queue);
			}
			
			/* now send or queue the new data */
			if (data_done) {
				refbuf_addref(refbuf);
				refbuf_queue_add(&client->queue, refbuf);
			} else {
473 474
				sbytes = source->format->write_buf_to_client(source->format,
                        client, refbuf->data, refbuf->len);
475
				if (sbytes >= 0) {
476 477
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
478 479
                        client->pos = sbytes;
						refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
480
                        refbuf_queue_insert(&client->queue, refbuf);
481 482
                    }
                }
483 484 485
                else {
                    DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
                    client->con->error = 1;
Jack Moffitt's avatar
Jack Moffitt committed
486 487 488 489 490 491 492
				}
			}

			/* if the client is too slow, its queue will slowly build up.
			** we need to make sure the client is keeping up with the
			** data, so we'll kick any client who's queue gets to large.
			*/
493
			if (refbuf_queue_length(&client->queue) > queue_limit) {
494
                DEBUG0("Client has fallen too far behind, removing");
Jack Moffitt's avatar
Jack Moffitt committed
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
				client->con->error = 1;
			}

			/* release read lock on node */
			avl_node_unlock(client_node);

			/* get the next node */
			client_node = avl_get_next(client_node);
		}
		/* release read lock on client_tree */
		avl_tree_unlock(source->client_tree);

		refbuf_release(refbuf);

		/* acquire write lock on client_tree */
		avl_tree_wlock(source->client_tree);

		/** delete bad clients **/
		client_node = avl_get_first(source->client_tree);
		while (client_node) {
			client = (client_t *)client_node->key;
			if (client->con->error) {
				client_node = avl_get_next(client_node);
				avl_delete(source->client_tree, (void *)client, _free_client);
				listeners--;
				stats_event_args(source->mount, "listeners", "%d", listeners);
521
				source->listeners = listeners;
522
                DEBUG0("Client removed");
Jack Moffitt's avatar
Jack Moffitt committed
523 524 525 526 527 528 529 530 531 532 533 534 535
				continue;
			}
			client_node = avl_get_next(client_node);
		}

		/* acquire write lock on pending_tree */
		avl_tree_wlock(source->pending_tree);

		/** add pending clients **/
		client_node = avl_get_first(source->pending_tree);
		while (client_node) {
			avl_insert(source->client_tree, client_node->key);
			listeners++;
536
            DEBUG0("Client added");
Jack Moffitt's avatar
Jack Moffitt committed
537 538 539
			stats_event_inc(NULL, "clients");
			stats_event_inc(source->mount, "connections");
			stats_event_args(source->mount, "listeners", "%d", listeners);
540
			source->listeners = listeners;
Jack Moffitt's avatar
Jack Moffitt committed
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564

			/* we have to send cached headers for some data formats
			** this is where we queue up the buffers to send
			*/
			if (source->format->has_predata) {
				client = (client_t *)client_node->key;
				client->queue = source->format->get_predata(source->format);
			}

			client_node = avl_get_next(client_node);
		}

		/** clear pending tree **/
		while (avl_get_first(source->pending_tree)) {
			avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, _remove_client);
		}

		/* release write lock on pending_tree */
		avl_tree_unlock(source->pending_tree);

		/* release write lock on client_tree */
		avl_tree_unlock(source->client_tree);
	}

Michael Smith's avatar
Michael Smith committed
565 566
done:

567
    DEBUG0("Source exiting");
568
	if(!suppress_yp) {
569 570
		yp_remove(source);
	}
Jack Moffitt's avatar
Jack Moffitt committed
571

Michael Smith's avatar
Michael Smith committed
572 573 574 575
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

Jack Moffitt's avatar
Jack Moffitt committed
576 577 578
	/* we need to empty the client and pending trees */
	avl_tree_wlock(source->pending_tree);
	while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
579 580 581 582 583
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
            avl_delete(source->pending_tree, client, _remove_client);

584
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
585 586 587 588 589 590 591
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
    		avl_delete(source->pending_tree, client, _free_client);
        }
Jack Moffitt's avatar
Jack Moffitt committed
592 593
	}
	avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
594

Jack Moffitt's avatar
Jack Moffitt committed
595 596
	avl_tree_wlock(source->client_tree);
	while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
597 598 599 600 601
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
            avl_delete(source->client_tree, client, _remove_client);

602
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
603 604 605 606 607 608 609
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
		    avl_delete(source->client_tree, client, _free_client);
        }
Jack Moffitt's avatar
Jack Moffitt committed
610 611 612 613 614 615 616 617 618 619 620
	}
	avl_tree_unlock(source->client_tree);

	/* delete this sources stats */
	stats_event_dec(NULL, "sources");
	stats_event(source->mount, "listeners", NULL);

	global_lock();
	global.sources--;
	global_unlock();

621 622 623
    if(source->dumpfile)
        fclose(source->dumpfile);

Jack Moffitt's avatar
Jack Moffitt committed
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
	/* release our hold on the lock so the main thread can continue cleaning up */
	thread_rwlock_unlock(source->shutdown_rwlock);

	avl_tree_wlock(global.source_tree);
	avl_delete(global.source_tree, source, source_free_source);
	avl_tree_unlock(global.source_tree);

	thread_exit(0);
      
	return NULL;
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
	connection_t *cona = (connection_t *)a;
639
    connection_t *conb = (connection_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654

	if (cona->id < conb->id) return -1;
	if (cona->id > conb->id) return 1;

	return 0;
}

static int _remove_client(void *key)
{
	return 1;
}

static int _free_client(void *key)
{
	client_t *client = (client_t *)key;
655 656 657 658 659

	global_lock();
	global.clients--;
	global_unlock();
	stats_event_dec(NULL, "clients");
Jack Moffitt's avatar
Jack Moffitt committed
660 661 662 663 664
	
	client_destroy(client);
	
	return 1;
}
665

666 667
static int _parse_audio_info(source_t *source, char *s)
{
668 669 670 671
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
672 673 674 675 676 677 678 679 680 681 682 683 684 685

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
            memset(variable, '\000', pvar-token+1);
            strncpy(variable, token, pvar-token);	
            pvar++;
            if (strlen(pvar)) {
                value = (char *)malloc(strlen(pvar)+1);
                memset(value, '\000', strlen(pvar)+1);
                strncpy(value, pvar, strlen(pvar));	
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
686 687 688
                if (value) {
                    free(value);
                }
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789
            }
            if (variable) {
                    free(variable);
            }
        }
        s = NULL;
    }
    return 1;
}

static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type)
{
    int i;
    if (!info) {
        return;
    }
    for (i=0;i<source->num_yp_directories;i++) {
        switch (type) {
        case YP_SERVER_NAME:
        if (source->ypdata[i]->server_name) {
                free(source->ypdata[i]->server_name);
                }
                source->ypdata[i]->server_name = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_name, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_DESC:
                if (source->ypdata[i]->server_desc) {
                    free(source->ypdata[i]->server_desc);
                }
                source->ypdata[i]->server_desc = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_desc, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_GENRE:
                if (source->ypdata[i]->server_genre) {
                    free(source->ypdata[i]->server_genre);
                }
                source->ypdata[i]->server_genre = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_genre, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_URL:
                if (source->ypdata[i]->server_url) {
                    free(source->ypdata[i]->server_url);
                }
                source->ypdata[i]->server_url = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_url, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_BITRATE:
                if (source->ypdata[i]->bitrate) {
                    free(source->ypdata[i]->bitrate);
                }
                source->ypdata[i]->bitrate = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->bitrate, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_AUDIO_INFO:
                if (source->ypdata[i]->audio_info) {
                    free(source->ypdata[i]->audio_info);
                }
                source->ypdata[i]->audio_info = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->audio_info, (char *)info);
                break;
        case YP_SERVER_TYPE:
                if (source->ypdata[i]->server_type) {
                    free(source->ypdata[i]->server_type);
                }
                source->ypdata[i]->server_type = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_type, (char *)info);
                break;
        case YP_CURRENT_SONG:
                if (source->ypdata[i]->current_song) {
                    free(source->ypdata[i]->current_song);
                }
                source->ypdata[i]->current_song = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->current_song, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_URL_TIMEOUT:
                source->ypdata[i]->yp_url_timeout = (int)info;
                break;
        case YP_LAST_TOUCH:
                source->ypdata[i]->yp_last_touch = (int)info;
                break;
        case YP_TOUCH_INTERVAL:
                source->ypdata[i]->yp_touch_interval = (int)info;
                break;
        }
    }
}