source.c 26.3 KB
Newer Older
1
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
Jack Moffitt's avatar
Jack Moffitt committed
2 3 4 5
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
6
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
7
#include <errno.h>
8 9

#ifndef _WIN32
10
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
11 12
#include <sys/time.h>
#include <sys/socket.h>
13
#else
14 15
#include <winsock2.h>
#include <windows.h>
16
#endif
Jack Moffitt's avatar
Jack Moffitt committed
17 18 19 20 21 22 23 24 25 26 27

#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
28
#include "log.h"
29
#include "logging.h"
30
#include "config.h"
31
#include "util.h"
32
#ifdef HAVE_CURL
33
#include "geturl.h"
34
#endif
Jack Moffitt's avatar
Jack Moffitt committed
35
#include "source.h"
Michael Smith's avatar
Michael Smith committed
36
#include "format.h"
Jack Moffitt's avatar
Jack Moffitt committed
37

38 39 40
#undef CATMODULE
#define CATMODULE "source"

41 42 43 44 45 46 47 48 49 50 51 52
#define  YP_SERVER_NAME 1
#define  YP_SERVER_DESC 2
#define  YP_SERVER_GENRE 3
#define  YP_SERVER_URL 4
#define  YP_BITRATE 5
#define  YP_AUDIO_INFO 6
#define  YP_SERVER_TYPE 7
#define  YP_CURRENT_SONG 8
#define  YP_URL_TIMEOUT 9
#define  YP_TOUCH_INTERVAL 10
#define  YP_LAST_TOUCH 11

Jack Moffitt's avatar
Jack Moffitt committed
53 54 55
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
56
static int _parse_audio_info(source_t *source, char *s);
57
#ifdef HAVE_CURL
58 59
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type);
60
#endif
Jack Moffitt's avatar
Jack Moffitt committed
61

62
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
63 64
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
65
{
66
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
67

68
    src = (source_t *)malloc(sizeof(source_t));
69
    src->client = client;
70
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
71
    src->fallback_mount = NULL;
72 73 74 75 76
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
77
    src->running = 1;
78 79
    src->num_yp_directories = 0;
    src->listeners = 0;
80
    src->max_listeners = -1;
81
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
82 83
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
84
    src->audio_info = util_dict_new();
Jack Moffitt's avatar
Jack Moffitt committed
85

Michael Smith's avatar
Michael Smith committed
86 87 88 89 90 91 92 93 94 95 96 97 98 99
    if(mountinfo != NULL) {
        src->fallback_mount = mountinfo->fallback_mount;
        src->max_listeners = mountinfo->max_listeners;
        src->dumpfilename = mountinfo->dumpfile;
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

100
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
101 102
}

103 104 105 106 107
static int source_remove_source(void *key)
{
    return 1;
}

Jack Moffitt's avatar
Jack Moffitt committed
108 109 110 111 112
/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
136 137 138 139
}

int source_compare_sources(void *arg, void *a, void *b)
{
140 141
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
142

143
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
144 145 146 147
}

int source_free_source(void *key)
{
148
    source_t *source = key;
149
    int i=0;
Jack Moffitt's avatar
Jack Moffitt committed
150

151
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
152
    free(source->fallback_mount);
153
    client_destroy(source->client);
154 155 156
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
157
#ifdef HAVE_CURL
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
158 159 160
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
161
#endif
162
    util_dict_free(source->audio_info);
163
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
164

165
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
166
}
167
    
Jack Moffitt's avatar
Jack Moffitt committed
168 169 170

void *source_main(void *arg)
{
171
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
172
    source_t *fallback_source;
173 174 175 176 177 178 179 180 181 182 183
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;
    char *s;
    long current_time;
    char current_song[256];

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
184

185
    int listeners = 0;
186 187
    int    i=0;
    int    suppress_yp = 0;
188
    char *ai;
Jack Moffitt's avatar
Jack Moffitt committed
189

Michael Smith's avatar
Michael Smith committed
190 191 192 193 194 195 196 197
    long queue_limit;
    ice_config_t *config;
    char *hostname;
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
198
    timeout = config->source_timeout;
Michael Smith's avatar
Michael Smith committed
199 200 201
    hostname = config->hostname;
    port = config->port;

202
#ifdef HAVE_CURL
203 204 205 206
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
Michael Smith's avatar
Michael Smith committed
207
                config->yp_url[i];
208
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
209
                config->yp_url_timeout[i];
210 211 212 213
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
214 215
#endif
    
Michael Smith's avatar
Michael Smith committed
216
    config_release_config();
217

218 219
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
220

221 222 223 224 225 226 227 228 229 230 231 232
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
    if (source_find_mount(source->mount) != NULL) {
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
        thread_exit(0);
        return NULL;
    }
233 234 235 236
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
237

238 239 240 241 242 243 244 245 246
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
247

248 249
    /* start off the statistics */
    source->listeners = 0;
250 251 252
    stats_event(source->mount, "listeners", "0");
    stats_event(source->mount, "type", source->format->format_description);
#ifdef HAVE_CURL
253
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
254
        _add_yp_info(source, "server_name", s, YP_SERVER_NAME);
255 256
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
257
        _add_yp_info(source, "server_url", s, YP_SERVER_URL);
258 259
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
260
        _add_yp_info(source, "genre", s, YP_SERVER_GENRE);
261 262
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
263
        _add_yp_info(source, "bitrate", s, YP_BITRATE);
264 265
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
266
        _add_yp_info(source, "server_description", s, YP_SERVER_DESC);
267 268 269 270 271 272
    }
    if ((s = httpp_getvar(source->parser, "ice-private"))) {
        stats_event(source->mount, "public", s);
        suppress_yp = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
273 274
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
275
            _add_yp_info(source, "audio_info", 
276 277 278
                    ai,
                    YP_AUDIO_INFO);
        }
279 280 281 282 283 284
    }
    for (i=0;i<source->num_yp_directories;i++) {
        if (source->ypdata[i]->server_type) {
            free(source->ypdata[i]->server_type);
        }
        source->ypdata[i]->server_type = malloc(
285
                strlen(source->format->format_description) + 1);
286
        strcpy(source->ypdata[i]->server_type, 
287
                source->format->format_description);
288
    }
Jack Moffitt's avatar
Jack Moffitt committed
289

290
    for (i=0;i<source->num_yp_directories;i++) {
291
        int listen_url_size;
292 293 294 295 296
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
297
            strlen(hostname) + 
298
            strlen(":") + 6 + strlen(source->mount) + 1;
299 300
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
301
                hostname, port, source->mount);
302
    }
303

304
    if(!suppress_yp) {
305 306
        yp_add(source, YP_ADD_ALL);

307
        current_time = time(NULL);
308

309 310 311
        _add_yp_info(source, "last_touch", (void *)current_time, 
            YP_LAST_TOUCH);

312
        for (i=0;i<source->num_yp_directories;i++) {
313 314 315 316
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
            source->ypdata[i]->yp_last_touch = current_time - 
                source->ypdata[i]->yp_touch_interval + 5;
317
            /* Don't permit touch intervals of less than 30 seconds */
318 319 320 321
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
        }
322
    }
323
#endif
324

325 326
    DEBUG0("Source creation complete");

327
    while (global.running == ICE_RUNNING && source->running) {
328
#ifdef HAVE_CURL
329
        if(!suppress_yp) {
330
            current_time = time(NULL);
331 332
            for (i=0;i<source->num_yp_directories;i++) {
                if (current_time > (source->ypdata[i]->yp_last_touch + 
333 334
                            source->ypdata[i]->yp_touch_interval)) {
                    current_song[0] = 0;
335 336
                    if (stats_get_value(source->mount, "artist")) {
                        strncat(current_song, 
337 338
                                stats_get_value(source->mount, "artist"), 
                                sizeof(current_song) - 1);
339 340 341 342 343 344
                        if (strlen(current_song) + 4 < sizeof(current_song)) {
                            strncat(current_song, " - ", 3);
                        }
                    }
                    if (stats_get_value(source->mount, "title")) {
                        if (strlen(current_song) + 
345 346 347
                                strlen(stats_get_value(source->mount, "title"))
                                < sizeof(current_song) -1) 
                        {
348
                            strncat(current_song, 
349 350 351
                                    stats_get_value(source->mount, "title"), 
                                    sizeof(current_song) - 1 - 
                                    strlen(current_song));
352 353
                        }
                    }
354
                    
355 356 357 358 359 360
                    if (source->ypdata[i]->current_song) {
                        free(source->ypdata[i]->current_song);
                        source->ypdata[i]->current_song = NULL;
                    }
    
                    source->ypdata[i]->current_song = 
361
                        malloc(strlen(current_song) + 1);
362 363 364
                    strcpy(source->ypdata[i]->current_song, current_song);
    
                    thread_create("YP Touch Thread", yp_touch_thread, 
365
                            (void *)source, THREAD_DETACHED); 
366 367 368
                }
            }
        }
369
#endif
370
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
371 372 373 374
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
375
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
376 377 378
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
379 380
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

381
                if (ret <= 0) { /* timeout expired */
382 383
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
384 385 386
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
387

388 389
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
                if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) {
390 391
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
392
                    break;
393
                }
394 395
            }
            if (bytes <= 0) break;
396
            source->client->con->sent_bytes += bytes;
397
            ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf);
Michael Smith's avatar
Michael Smith committed
398 399 400 401
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
420

Michael Smith's avatar
Michael Smith committed
421 422 423 424 425 426 427 428 429 430 431 432
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

433 434
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
435

436 437 438 439
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
440

441 442 443
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
444

445 446 447 448 449 450 451
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
452

453
                sbytes = source->format->write_buf_to_client(source->format,
454
                        client, &abuf->data[client->pos], bytes);
455
                if (sbytes >= 0) {
456
                    if(sbytes != bytes) {
457 458 459
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
460 461 462 463 464 465
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
466 467 468
                else {
                    DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
                    client->con->error = 1;
469
                    data_done = 1;
470
                    refbuf_release(abuf);
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
487
                        client, refbuf->data, refbuf->len);
488
                if (sbytes >= 0) {
489 490
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
491
                        client->pos = sbytes;
492
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
493
                        refbuf_queue_insert(&client->queue, refbuf);
494 495
                    }
                }
496 497 498
                else {
                    DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
                    client->con->error = 1;
499 500 501 502 503 504 505 506
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
507
                DEBUG0("Client has fallen too far behind, removing");
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
                listeners--;
                stats_event_args(source->mount, "listeners", "%d", listeners);
                source->listeners = listeners;
535
                DEBUG0("Client removed");
536 537 538 539 540 541 542 543 544 545 546 547 548
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
            avl_insert(source->client_tree, client_node->key);
            listeners++;
549
            DEBUG0("Client added");
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
            stats_event_args(source->mount, "listeners", "%d", listeners);
            source->listeners = listeners;

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
            avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, source_remove_client);
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
577

Michael Smith's avatar
Michael Smith committed
578 579
done:

580
    DEBUG0("Source exiting");
581 582

#ifdef HAVE_CURL
583 584 585
    if(!suppress_yp) {
        yp_remove(source);
    }
586 587
#endif
    
Michael Smith's avatar
Michael Smith committed
588 589 590 591
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

592 593 594 595
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
596 597 598
    avl_tree_wlock(global.source_tree);
    avl_delete(global.source_tree, source, source_remove_source);
    avl_tree_unlock(global.source_tree);
599

600 601 602
    /* we need to empty the client and pending trees */
    avl_tree_wlock(source->pending_tree);
    while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
603 604 605
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
606
            avl_delete(source->pending_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
607

608
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
609 610 611 612 613
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
614
            avl_delete(source->pending_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
615
        }
616 617
    }
    avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
618

619 620
    avl_tree_wlock(source->client_tree);
    while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
621 622 623
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
624
            avl_delete(source->client_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
625

626
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
627 628 629 630 631
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
632
            avl_delete(source->client_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
633
        }
634 635
    }
    avl_tree_unlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
636

637 638 639
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
640

641 642 643
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
644

Michael Smith's avatar
Michael Smith committed
645 646 647
    if(source->dumpfile)
        fclose(source->dumpfile);

648 649
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
650

651 652
    source_free_source(source);

653
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
654
      
655
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
656 657 658 659
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
660
    connection_t *cona = (connection_t *)a;
661
    connection_t *conb = (connection_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
662

663 664
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
665

666
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
667 668
}

669
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
670
{
671
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
672 673 674 675
}

static int _free_client(void *key)
{
676 677 678 679 680 681 682 683 684 685
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
686
}
687

688 689
static int _parse_audio_info(source_t *source, char *s)
{
690 691 692 693
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
694 695 696 697 698

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
699
            strncpy(variable, token, pvar-token);    
700
            variable[pvar-token] = 0;
701 702
            pvar++;
            if (strlen(pvar)) {
703
                value = util_url_unescape(pvar);
704 705
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
706 707 708
                if (value) {
                    free(value);
                }
709 710
            }
            if (variable) {
711
                free(variable);
712 713 714 715 716 717 718
            }
        }
        s = NULL;
    }
    return 1;
}

719
#ifdef HAVE_CURL
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type)
{
    int i;
    if (!info) {
        return;
    }
    for (i=0;i<source->num_yp_directories;i++) {
        switch (type) {
        case YP_SERVER_NAME:
        if (source->ypdata[i]->server_name) {
                free(source->ypdata[i]->server_name);
                }
                source->ypdata[i]->server_name = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_name, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_DESC:
                if (source->ypdata[i]->server_desc) {
                    free(source->ypdata[i]->server_desc);
                }
                source->ypdata[i]->server_desc = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_desc, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_GENRE:
                if (source->ypdata[i]->server_genre) {
                    free(source->ypdata[i]->server_genre);
                }
                source->ypdata[i]->server_genre = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_genre, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_URL:
                if (source->ypdata[i]->server_url) {
                    free(source->ypdata[i]->server_url);
                }
                source->ypdata[i]->server_url = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_url, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_BITRATE:
                if (source->ypdata[i]->bitrate) {
                    free(source->ypdata[i]->bitrate);
                }
                source->ypdata[i]->bitrate = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->bitrate, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_AUDIO_INFO:
                if (source->ypdata[i]->audio_info) {
                    free(source->ypdata[i]->audio_info);
                }
                source->ypdata[i]->audio_info = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->audio_info, (char *)info);
                break;
        case YP_SERVER_TYPE:
                if (source->ypdata[i]->server_type) {
                    free(source->ypdata[i]->server_type);
                }
                source->ypdata[i]->server_type = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_type, (char *)info);
                break;
        case YP_CURRENT_SONG:
                if (source->ypdata[i]->current_song) {
                    free(source->ypdata[i]->current_song);
                }
                source->ypdata[i]->current_song = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->current_song, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_URL_TIMEOUT:
                source->ypdata[i]->yp_url_timeout = (int)info;
                break;
        case YP_LAST_TOUCH:
                source->ypdata[i]->yp_last_touch = (int)info;
                break;
        case YP_TOUCH_INTERVAL:
                source->ypdata[i]->yp_touch_interval = (int)info;
                break;
        }
    }
}
811
#endif