source.c 24.1 KB
Newer Older
Jack Moffitt's avatar
Jack Moffitt committed
1 2 3 4
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
5
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
6
#include <errno.h>
7 8

#ifndef _WIN32
9
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
10 11
#include <sys/time.h>
#include <sys/socket.h>
12
#else
13 14
#include <winsock2.h>
#include <windows.h>
15
#endif
Jack Moffitt's avatar
Jack Moffitt committed
16 17 18 19 20 21 22 23 24 25 26

#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
27
#include "log.h"
28
#include "logging.h"
29
#include "config.h"
30
#include "util.h"
31
#include "geturl.h"
Jack Moffitt's avatar
Jack Moffitt committed
32
#include "source.h"
Michael Smith's avatar
Michael Smith committed
33
#include "format.h"
Jack Moffitt's avatar
Jack Moffitt committed
34

35 36 37
#undef CATMODULE
#define CATMODULE "source"

38 39 40 41 42 43 44 45 46 47 48 49
#define  YP_SERVER_NAME 1
#define  YP_SERVER_DESC 2
#define  YP_SERVER_GENRE 3
#define  YP_SERVER_URL 4
#define  YP_BITRATE 5
#define  YP_AUDIO_INFO 6
#define  YP_SERVER_TYPE 7
#define  YP_CURRENT_SONG 8
#define  YP_URL_TIMEOUT 9
#define  YP_TOUCH_INTERVAL 10
#define  YP_LAST_TOUCH 11

Jack Moffitt's avatar
Jack Moffitt committed
50 51 52
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
53 54 55
static int _parse_audio_info(source_t *source, char *s);
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type);
Jack Moffitt's avatar
Jack Moffitt committed
56

57
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
58 59
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
60 61 62 63
{
	source_t *src;

	src = (source_t *)malloc(sizeof(source_t));
64
    src->client = client;
Jack Moffitt's avatar
Jack Moffitt committed
65
	src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
66
    src->fallback_mount = NULL;
67
	src->format = format_get_plugin(type, src->mount, parser);
Jack Moffitt's avatar
Jack Moffitt committed
68 69 70 71
	src->con = con;
	src->parser = parser;
	src->client_tree = avl_tree_new(_compare_clients, NULL);
	src->pending_tree = avl_tree_new(_compare_clients, NULL);
72
    src->running = 1;
73 74
	src->num_yp_directories = 0;
	src->listeners = 0;
75
    src->max_listeners = -1;
76
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
77 78
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
79
    src->audio_info = util_dict_new();
Jack Moffitt's avatar
Jack Moffitt committed
80

Michael Smith's avatar
Michael Smith committed
81 82 83 84 85 86 87 88 89 90 91 92 93 94
    if(mountinfo != NULL) {
        src->fallback_mount = mountinfo->fallback_mount;
        src->max_listeners = mountinfo->max_listeners;
        src->dumpfilename = mountinfo->dumpfile;
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

Jack Moffitt's avatar
Jack Moffitt committed
95 96 97
	return src;
}

98 99 100 101 102
static int source_remove_source(void *key)
{
    return 1;
}

Jack Moffitt's avatar
Jack Moffitt committed
103 104 105 106 107 108 109 110 111
/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
	source_t *source;
	avl_node *node;
	int cmp;

112 113 114
	if (!mount) {
		return NULL;
	}
Jack Moffitt's avatar
Jack Moffitt committed
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
	/* get the root node */
	node = global.source_tree->root->right;
	
	while (node) {
		source = (source_t *)node->key;
		cmp = strcmp(mount, source->mount);
		if (cmp < 0) 
			node = node->left;
		else if (cmp > 0)
			node = node->right;
		else
			return source;
	}
	
	/* didn't find it */
	return NULL;
}

int source_compare_sources(void *arg, void *a, void *b)
{
	source_t *srca = (source_t *)a;
	source_t *srcb = (source_t *)b;

	return strcmp(srca->mount, srcb->mount);
}

int source_free_source(void *key)
{
143
    source_t *source = key;
144
	int i=0;
Jack Moffitt's avatar
Jack Moffitt committed
145 146

	free(source->mount);
Michael Smith's avatar
Michael Smith committed
147
    free(source->fallback_mount);
148
    client_destroy(source->client);
Jack Moffitt's avatar
Jack Moffitt committed
149 150
	avl_tree_free(source->pending_tree, _free_client);
	avl_tree_free(source->client_tree, _free_client);
151
	source->format->free_plugin(source->format);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
152 153 154
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
155
    util_dict_free(source->audio_info);
Jack Moffitt's avatar
Jack Moffitt committed
156 157 158 159 160 161
	free(source);

	return 1;
}
	

162 163 164
/* The caller MUST have a current write lock on global.source_tree when calling
 * this
 */
Jack Moffitt's avatar
Jack Moffitt committed
165 166 167
void *source_main(void *arg)
{
	source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
168
    source_t *fallback_source;
Jack Moffitt's avatar
Jack Moffitt committed
169 170
	char buffer[4096];
	long bytes, sbytes;
171
	int ret, timeout;
Jack Moffitt's avatar
Jack Moffitt committed
172 173
	client_t *client;
	avl_node *client_node;
Jack Moffitt's avatar
Jack Moffitt committed
174
	char *s;
175 176
	long current_time;
	char current_song[256];
Jack Moffitt's avatar
Jack Moffitt committed
177 178 179 180

	refbuf_t *refbuf, *abuf;
	int data_done;

181 182 183 184
    int listeners = 0;
    int	i=0;
    int	suppress_yp = 0;
    char *ai;
Jack Moffitt's avatar
Jack Moffitt committed
185

Michael Smith's avatar
Michael Smith committed
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
    long queue_limit;
    ice_config_t *config;
    char *hostname;
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
	timeout = config->source_timeout;
    hostname = config->hostname;
    port = config->port;

	for (i=0;i<config->num_yp_directories;i++) {
		if (config->yp_url[i]) {
			source->ypdata[source->num_yp_directories] = yp_create_ypdata();
			source->ypdata[source->num_yp_directories]->yp_url = 
                config->yp_url[i];
			source->ypdata[source->num_yp_directories]->yp_url_timeout = 
                config->yp_url_timeout[i];
			source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
			source->num_yp_directories++;
		}
	}
209

Michael Smith's avatar
Michael Smith committed
210
    config_release_config();
211

Jack Moffitt's avatar
Jack Moffitt committed
212 213 214
	/* grab a read lock, to make sure we get a chance to cleanup */
	thread_rwlock_rlock(source->shutdown_rwlock);

215 216 217 218 219 220 221 222 223 224 225 226
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
    if (source_find_mount(source->mount) != NULL) {
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
        thread_exit(0);
        return NULL;
    }
Jack Moffitt's avatar
Jack Moffitt committed
227 228 229 230 231
	/* insert source onto source tree */
	avl_insert(global.source_tree, (void *)source);
	/* release write lock on global source tree */
	avl_tree_unlock(global.source_tree);

232 233 234 235 236 237 238 239 240
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
241 242 243

	/* start off the statistics */
	stats_event(source->mount, "listeners", "0");
244 245
	source->listeners = 0;
	if ((s = httpp_getvar(source->parser, "ice-name"))) {
246
        _add_yp_info(source, "server_name", s, YP_SERVER_NAME);
247 248
	}
	if ((s = httpp_getvar(source->parser, "ice-url"))) {
249
        _add_yp_info(source, "server_url", s, YP_SERVER_URL);
250 251
	}
	if ((s = httpp_getvar(source->parser, "ice-genre"))) {
252
        _add_yp_info(source, "genre", s, YP_SERVER_GENRE);
253 254
	}
	if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
255
        _add_yp_info(source, "bitrate", s, YP_BITRATE);
256 257
	}
	if ((s = httpp_getvar(source->parser, "ice-description"))) {
258
        _add_yp_info(source, "server_description", s, YP_SERVER_DESC);
259
	}
260
	if ((s = httpp_getvar(source->parser, "ice-private"))) {
261
		stats_event(source->mount, "public", s);
262
		suppress_yp = atoi(s);
263
	}
264 265 266
	if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
267
            _add_yp_info(source, "audio_info", 
268 269 270 271
                    ai,
                    YP_AUDIO_INFO);
        }
	}
272 273 274 275
	for (i=0;i<source->num_yp_directories;i++) {
		if (source->ypdata[i]->server_type) {
			free(source->ypdata[i]->server_type);
		}
276 277 278 279
		source->ypdata[i]->server_type = malloc(
                strlen(source->format->format_description) + 1);
		strcpy(source->ypdata[i]->server_type, 
                source->format->format_description);
280
	}
281
    stats_event(source->mount, "type", source->format->format_description);
Jack Moffitt's avatar
Jack Moffitt committed
282

283
	for (i=0;i<source->num_yp_directories;i++) {
284
        int listen_url_size;
285 286 287
		if (source->ypdata[i]->listen_url) {
			free(source->ypdata[i]->listen_url);
		}
288 289
		/* 6 for max size of port */
		listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
290
            strlen(hostname) + 
291 292 293
            strlen(":") + 6 + strlen(source->mount) + 1;
		source->ypdata[i]->listen_url = malloc(listen_url_size);
		sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
294
                hostname, port, source->mount);
295 296
	}

297 298 299 300
	if(!suppress_yp) {
        yp_add(source, YP_ADD_ALL);

    	current_time = time(NULL);
301

302 303 304
        _add_yp_info(source, "last_touch", (void *)current_time, 
            YP_LAST_TOUCH);

305
	    for (i=0;i<source->num_yp_directories;i++) {
306 307 308 309
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
            source->ypdata[i]->yp_last_touch = current_time - 
                source->ypdata[i]->yp_touch_interval + 5;
310 311 312 313 314 315 316
            /* Don't permit touch intervals of less than 30 seconds */
	    	if (source->ypdata[i]->yp_touch_interval <= 30) {
		    	source->ypdata[i]->yp_touch_interval = 30;
    		}
	    }
    }

317 318
    DEBUG0("Source creation complete");

319 320 321
	while (global.running == ICE_RUNNING && source->running) {
		if(!suppress_yp) {
            current_time = time(NULL);
322
			for (i=0;i<source->num_yp_directories;i++) {
323 324 325
				if (current_time > (source->ypdata[i]->yp_last_touch + 
                            source->ypdata[i]->yp_touch_interval)) {
                    current_song[0] = 0;
326
					if (stats_get_value(source->mount, "artist")) {
327 328 329
						strncat(current_song, 
                                stats_get_value(source->mount, "artist"), 
                                sizeof(current_song) - 1);
330 331 332 333 334
						if (strlen(current_song) + 4 < sizeof(current_song)) {
							strncat(current_song, " - ", 3);
						}
					}
					if (stats_get_value(source->mount, "title")) {
335 336 337 338 339 340 341 342
						if (strlen(current_song) + 
                                strlen(stats_get_value(source->mount, "title"))
                                < sizeof(current_song) -1) 
                        {
							strncat(current_song, 
                                    stats_get_value(source->mount, "title"), 
                                    sizeof(current_song) - 1 - 
                                    strlen(current_song));
343 344
						}
					}
345
                    
346 347 348 349 350
					if (source->ypdata[i]->current_song) {
						free(source->ypdata[i]->current_song);
						source->ypdata[i]->current_song = NULL;
					}
	
351 352
					source->ypdata[i]->current_song = 
                        malloc(strlen(current_song) + 1);
353 354
					strcpy(source->ypdata[i]->current_song, current_song);
	
355 356
					thread_create("YP Touch Thread", yp_touch_thread, 
                            (void *)source, THREAD_DETACHED); 
357 358 359
				}
			}
		}
360

361
		ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
362 363 364 365
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
366
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
Jack Moffitt's avatar
Jack Moffitt committed
367 368 369
		while (refbuf == NULL) {
			bytes = 0;
			while (bytes <= 0) {
370 371 372
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

				if (ret <= 0) { /* timeout expired */
373 374
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
375 376 377
					bytes = 0;
					break;
				}
Jack Moffitt's avatar
Jack Moffitt committed
378 379

				bytes = sock_read_bytes(source->con->sock, buffer, 4096);
380 381 382
				if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) {
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
383
                    break;
384
                }
Jack Moffitt's avatar
Jack Moffitt committed
385 386
			}
			if (bytes <= 0) break;
387
            source->client->con->sent_bytes += bytes;
Michael Smith's avatar
Michael Smith committed
388 389 390 391 392
			ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf);
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
Jack Moffitt's avatar
Jack Moffitt committed
393 394 395
		}

		if (bytes <= 0) {
396
			INFO0("Removing source following disconnection");
Jack Moffitt's avatar
Jack Moffitt committed
397 398 399 400 401
			break;
		}

		/* we have a refbuf buffer, which a data block to be sent to 
		** all clients.  if a client is not able to send the buffer
Michael Smith's avatar
Michael Smith committed
402
		** immediately, it should store it on its queue for the next
Jack Moffitt's avatar
Jack Moffitt committed
403 404 405
		** go around.
		**
		** instead of sending the current block, a client should send
Michael Smith's avatar
Michael Smith committed
406
		** all data in the queue, plus the current block, until either
Jack Moffitt's avatar
Jack Moffitt committed
407 408 409 410 411
		** it runs out of data, or it hits a recoverable error like
		** EAGAIN.  this will allow a client that got slightly lagged
		** to catch back up if it can
		*/

Michael Smith's avatar
Michael Smith committed
412 413 414 415 416 417 418 419 420 421 422 423
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

Jack Moffitt's avatar
Jack Moffitt committed
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
		/* acquire read lock on client_tree */
		avl_tree_rlock(source->client_tree);

		client_node = avl_get_first(source->client_tree);
		while (client_node) {
			/* acquire read lock on node */
			avl_node_wlock(client_node);

			client = (client_t *)client_node->key;
			
			data_done = 0;

			/* do we have any old buffers? */
			abuf = refbuf_queue_remove(&client->queue);
			while (abuf) {
				if (client->pos > 0)
					bytes = abuf->len - client->pos;
				else
					bytes = abuf->len;

444 445
				sbytes = source->format->write_buf_to_client(source->format,
                        client, &abuf->data[client->pos], bytes);
446 447
				if (sbytes >= 0) {
                    if(sbytes != bytes) {
448 449 450
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
451 452 453 454 455 456
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
457 458 459
                else {
                    DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
                    client->con->error = 1;
Jack Moffitt's avatar
Jack Moffitt committed
460
					data_done = 1;
461
                    refbuf_release(abuf);
Jack Moffitt's avatar
Jack Moffitt committed
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
					break;
				}
				
				/* we're done with that refbuf, release it and reset the pos */
				refbuf_release(abuf);
				client->pos = 0;

				abuf = refbuf_queue_remove(&client->queue);
			}
			
			/* now send or queue the new data */
			if (data_done) {
				refbuf_addref(refbuf);
				refbuf_queue_add(&client->queue, refbuf);
			} else {
477 478
				sbytes = source->format->write_buf_to_client(source->format,
                        client, refbuf->data, refbuf->len);
479
				if (sbytes >= 0) {
480 481
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
482 483
                        client->pos = sbytes;
						refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
484
                        refbuf_queue_insert(&client->queue, refbuf);
485 486
                    }
                }
487 488 489
                else {
                    DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
                    client->con->error = 1;
Jack Moffitt's avatar
Jack Moffitt committed
490 491 492 493 494 495 496
				}
			}

			/* if the client is too slow, its queue will slowly build up.
			** we need to make sure the client is keeping up with the
			** data, so we'll kick any client who's queue gets to large.
			*/
497
			if (refbuf_queue_length(&client->queue) > queue_limit) {
498
                DEBUG0("Client has fallen too far behind, removing");
Jack Moffitt's avatar
Jack Moffitt committed
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
				client->con->error = 1;
			}

			/* release read lock on node */
			avl_node_unlock(client_node);

			/* get the next node */
			client_node = avl_get_next(client_node);
		}
		/* release read lock on client_tree */
		avl_tree_unlock(source->client_tree);

		refbuf_release(refbuf);

		/* acquire write lock on client_tree */
		avl_tree_wlock(source->client_tree);

		/** delete bad clients **/
		client_node = avl_get_first(source->client_tree);
		while (client_node) {
			client = (client_t *)client_node->key;
			if (client->con->error) {
				client_node = avl_get_next(client_node);
				avl_delete(source->client_tree, (void *)client, _free_client);
				listeners--;
				stats_event_args(source->mount, "listeners", "%d", listeners);
525
				source->listeners = listeners;
526
                DEBUG0("Client removed");
Jack Moffitt's avatar
Jack Moffitt committed
527 528 529 530 531 532 533 534 535 536 537 538 539
				continue;
			}
			client_node = avl_get_next(client_node);
		}

		/* acquire write lock on pending_tree */
		avl_tree_wlock(source->pending_tree);

		/** add pending clients **/
		client_node = avl_get_first(source->pending_tree);
		while (client_node) {
			avl_insert(source->client_tree, client_node->key);
			listeners++;
540
            DEBUG0("Client added");
Jack Moffitt's avatar
Jack Moffitt committed
541 542 543
			stats_event_inc(NULL, "clients");
			stats_event_inc(source->mount, "connections");
			stats_event_args(source->mount, "listeners", "%d", listeners);
544
			source->listeners = listeners;
Jack Moffitt's avatar
Jack Moffitt committed
545 546 547 548 549 550 551 552 553 554 555 556 557 558

			/* we have to send cached headers for some data formats
			** this is where we queue up the buffers to send
			*/
			if (source->format->has_predata) {
				client = (client_t *)client_node->key;
				client->queue = source->format->get_predata(source->format);
			}

			client_node = avl_get_next(client_node);
		}

		/** clear pending tree **/
		while (avl_get_first(source->pending_tree)) {
559
			avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, source_remove_client);
Jack Moffitt's avatar
Jack Moffitt committed
560 561 562 563 564 565 566 567 568
		}

		/* release write lock on pending_tree */
		avl_tree_unlock(source->pending_tree);

		/* release write lock on client_tree */
		avl_tree_unlock(source->client_tree);
	}

Michael Smith's avatar
Michael Smith committed
569 570
done:

571
    DEBUG0("Source exiting");
572
	if(!suppress_yp) {
573 574
		yp_remove(source);
	}
Jack Moffitt's avatar
Jack Moffitt committed
575

Michael Smith's avatar
Michael Smith committed
576 577 578 579
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

580 581 582 583 584 585 586 587
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
	avl_tree_wlock(global.source_tree);
	avl_delete(global.source_tree, source, source_remove_source);
	avl_tree_unlock(global.source_tree);

Jack Moffitt's avatar
Jack Moffitt committed
588 589 590
	/* we need to empty the client and pending trees */
	avl_tree_wlock(source->pending_tree);
	while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
591 592 593
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
594
            avl_delete(source->pending_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
595

596
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
597 598 599 600 601 602 603
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
    		avl_delete(source->pending_tree, client, _free_client);
        }
Jack Moffitt's avatar
Jack Moffitt committed
604 605
	}
	avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
606

Jack Moffitt's avatar
Jack Moffitt committed
607 608
	avl_tree_wlock(source->client_tree);
	while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
609 610 611
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
612
            avl_delete(source->client_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
613

614
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
615 616 617 618 619 620 621
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
		    avl_delete(source->client_tree, client, _free_client);
        }
Jack Moffitt's avatar
Jack Moffitt committed
622 623 624 625 626 627 628 629 630 631 632
	}
	avl_tree_unlock(source->client_tree);

	/* delete this sources stats */
	stats_event_dec(NULL, "sources");
	stats_event(source->mount, "listeners", NULL);

	global_lock();
	global.sources--;
	global_unlock();

Michael Smith's avatar
Michael Smith committed
633 634 635
    if(source->dumpfile)
        fclose(source->dumpfile);

636 637
    source_free_source(source);

Jack Moffitt's avatar
Jack Moffitt committed
638 639 640 641 642 643 644 645 646 647 648
	/* release our hold on the lock so the main thread can continue cleaning up */
	thread_rwlock_unlock(source->shutdown_rwlock);

	thread_exit(0);
      
	return NULL;
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
	connection_t *cona = (connection_t *)a;
649
    connection_t *conb = (connection_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
650 651 652 653 654 655 656

	if (cona->id < conb->id) return -1;
	if (cona->id > conb->id) return 1;

	return 0;
}

657
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
658 659 660 661 662 663 664
{
	return 1;
}

static int _free_client(void *key)
{
	client_t *client = (client_t *)key;
665 666 667 668 669

	global_lock();
	global.clients--;
	global_unlock();
	stats_event_dec(NULL, "clients");
Jack Moffitt's avatar
Jack Moffitt committed
670 671 672 673 674
	
	client_destroy(client);
	
	return 1;
}
675

676 677
static int _parse_audio_info(source_t *source, char *s)
{
678 679 680 681
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
682 683 684 685 686 687 688 689

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
            strncpy(variable, token, pvar-token);	
            pvar++;
            if (strlen(pvar)) {
690
                value = util_url_unescape(pvar);
691 692
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
693 694 695
                if (value) {
                    free(value);
                }
696 697
            }
            if (variable) {
698
                free(variable);
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
            }
        }
        s = NULL;
    }
    return 1;
}

static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type)
{
    int i;
    if (!info) {
        return;
    }
    for (i=0;i<source->num_yp_directories;i++) {
        switch (type) {
        case YP_SERVER_NAME:
        if (source->ypdata[i]->server_name) {
                free(source->ypdata[i]->server_name);
                }
                source->ypdata[i]->server_name = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_name, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_DESC:
                if (source->ypdata[i]->server_desc) {
                    free(source->ypdata[i]->server_desc);
                }
                source->ypdata[i]->server_desc = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_desc, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_GENRE:
                if (source->ypdata[i]->server_genre) {
                    free(source->ypdata[i]->server_genre);
                }
                source->ypdata[i]->server_genre = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_genre, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_URL:
                if (source->ypdata[i]->server_url) {
                    free(source->ypdata[i]->server_url);
                }
                source->ypdata[i]->server_url = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_url, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_BITRATE:
                if (source->ypdata[i]->bitrate) {
                    free(source->ypdata[i]->bitrate);
                }
                source->ypdata[i]->bitrate = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->bitrate, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_AUDIO_INFO:
                if (source->ypdata[i]->audio_info) {
                    free(source->ypdata[i]->audio_info);
                }
                source->ypdata[i]->audio_info = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->audio_info, (char *)info);
                break;
        case YP_SERVER_TYPE:
                if (source->ypdata[i]->server_type) {
                    free(source->ypdata[i]->server_type);
                }
                source->ypdata[i]->server_type = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_type, (char *)info);
                break;
        case YP_CURRENT_SONG:
                if (source->ypdata[i]->current_song) {
                    free(source->ypdata[i]->current_song);
                }
                source->ypdata[i]->current_song = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->current_song, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_URL_TIMEOUT:
                source->ypdata[i]->yp_url_timeout = (int)info;
                break;
        case YP_LAST_TOUCH:
                source->ypdata[i]->yp_last_touch = (int)info;
                break;
        case YP_TOUCH_INTERVAL:
                source->ypdata[i]->yp_touch_interval = (int)info;
                break;
        }
    }
}