source.c 26.9 KB
Newer Older
1
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
Jack Moffitt's avatar
Jack Moffitt committed
2 3 4 5
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
6
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
7
#include <errno.h>
8 9

#ifndef _WIN32
10
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
11 12
#include <sys/time.h>
#include <sys/socket.h>
13
#else
14 15
#include <winsock2.h>
#include <windows.h>
16
#endif
Jack Moffitt's avatar
Jack Moffitt committed
17 18 19 20 21 22 23 24 25 26 27

#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
28
#include "log.h"
29
#include "logging.h"
30
#include "config.h"
31
#include "util.h"
32
#ifdef HAVE_CURL
33
#include "geturl.h"
34
#endif
Jack Moffitt's avatar
Jack Moffitt committed
35
#include "source.h"
Michael Smith's avatar
Michael Smith committed
36
#include "format.h"
Jack Moffitt's avatar
Jack Moffitt committed
37

38 39 40
#undef CATMODULE
#define CATMODULE "source"

41 42 43 44 45 46 47 48 49 50 51 52
#define  YP_SERVER_NAME 1
#define  YP_SERVER_DESC 2
#define  YP_SERVER_GENRE 3
#define  YP_SERVER_URL 4
#define  YP_BITRATE 5
#define  YP_AUDIO_INFO 6
#define  YP_SERVER_TYPE 7
#define  YP_CURRENT_SONG 8
#define  YP_URL_TIMEOUT 9
#define  YP_TOUCH_INTERVAL 10
#define  YP_LAST_TOUCH 11

Jack Moffitt's avatar
Jack Moffitt committed
53 54 55
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
56
static int _parse_audio_info(source_t *source, char *s);
57
#ifdef HAVE_CURL
58 59
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type);
60
#endif
Jack Moffitt's avatar
Jack Moffitt committed
61

62
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
63 64
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
65
{
66
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
67

68
    src = (source_t *)malloc(sizeof(source_t));
69
    src->client = client;
70
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
71
    src->fallback_mount = NULL;
72 73 74 75 76
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
77
    src->running = 1;
78 79
    src->num_yp_directories = 0;
    src->listeners = 0;
80
    src->max_listeners = -1;
81
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
82 83
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
84
    src->audio_info = util_dict_new();
Jack Moffitt's avatar
Jack Moffitt committed
85

Michael Smith's avatar
Michael Smith committed
86 87 88 89 90 91 92 93 94 95 96 97 98 99
    if(mountinfo != NULL) {
        src->fallback_mount = mountinfo->fallback_mount;
        src->max_listeners = mountinfo->max_listeners;
        src->dumpfilename = mountinfo->dumpfile;
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

100
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
101 102
}

103 104 105 106 107
static int source_remove_source(void *key)
{
    return 1;
}

Jack Moffitt's avatar
Jack Moffitt committed
108 109 110 111 112
/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
136 137 138 139
}

int source_compare_sources(void *arg, void *a, void *b)
{
140 141
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
142

143
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
144 145 146 147
}

int source_free_source(void *key)
{
148
    source_t *source = key;
149
    int i=0;
Jack Moffitt's avatar
Jack Moffitt committed
150

151
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
152
    free(source->fallback_mount);
153
    client_destroy(source->client);
154 155 156
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
157
#ifdef HAVE_CURL
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
158 159 160
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
161
#endif
162
    util_dict_free(source->audio_info);
163
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
164

165
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
166
}
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
    client_t *result;
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
    if(avl_get_by_key(source->client_tree, &fakeclient, (void **)&result) == 0)
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
187
    
Jack Moffitt's avatar
Jack Moffitt committed
188 189 190

void *source_main(void *arg)
{
191
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
192
    source_t *fallback_source;
193 194 195 196 197 198 199 200 201 202 203
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;
    char *s;
    long current_time;
    char current_song[256];

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
204

205
    int listeners = 0;
206 207
    int    i=0;
    int    suppress_yp = 0;
208
    char *ai;
Jack Moffitt's avatar
Jack Moffitt committed
209

Michael Smith's avatar
Michael Smith committed
210 211 212 213 214 215 216 217
    long queue_limit;
    ice_config_t *config;
    char *hostname;
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
218
    timeout = config->source_timeout;
Michael Smith's avatar
Michael Smith committed
219 220 221
    hostname = config->hostname;
    port = config->port;

222
#ifdef HAVE_CURL
223 224 225 226
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
Michael Smith's avatar
Michael Smith committed
227
                config->yp_url[i];
228
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
229
                config->yp_url_timeout[i];
230 231 232 233
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
234 235
#endif
    
Michael Smith's avatar
Michael Smith committed
236
    config_release_config();
237

238 239
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
240

241 242 243 244 245 246 247 248 249 250 251 252
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
    if (source_find_mount(source->mount) != NULL) {
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
        thread_exit(0);
        return NULL;
    }
253 254 255 256
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
257

258 259 260 261 262 263 264 265 266
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
267

268 269
    /* start off the statistics */
    source->listeners = 0;
270 271 272
    stats_event(source->mount, "listeners", "0");
    stats_event(source->mount, "type", source->format->format_description);
#ifdef HAVE_CURL
273
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
274
        _add_yp_info(source, "server_name", s, YP_SERVER_NAME);
275 276
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
277
        _add_yp_info(source, "server_url", s, YP_SERVER_URL);
278 279
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
280
        _add_yp_info(source, "genre", s, YP_SERVER_GENRE);
281 282
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
283
        _add_yp_info(source, "bitrate", s, YP_BITRATE);
284 285
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
286
        _add_yp_info(source, "server_description", s, YP_SERVER_DESC);
287 288 289 290 291 292
    }
    if ((s = httpp_getvar(source->parser, "ice-private"))) {
        stats_event(source->mount, "public", s);
        suppress_yp = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
293 294
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
295
            _add_yp_info(source, "audio_info", 
296 297 298
                    ai,
                    YP_AUDIO_INFO);
        }
299 300 301 302 303 304
    }
    for (i=0;i<source->num_yp_directories;i++) {
        if (source->ypdata[i]->server_type) {
            free(source->ypdata[i]->server_type);
        }
        source->ypdata[i]->server_type = malloc(
305
                strlen(source->format->format_description) + 1);
306
        strcpy(source->ypdata[i]->server_type, 
307
                source->format->format_description);
308
    }
Jack Moffitt's avatar
Jack Moffitt committed
309

310
    for (i=0;i<source->num_yp_directories;i++) {
311
        int listen_url_size;
312 313 314 315 316
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
317
            strlen(hostname) + 
318
            strlen(":") + 6 + strlen(source->mount) + 1;
319 320
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
321
                hostname, port, source->mount);
322
    }
323

324
    if(!suppress_yp) {
325 326
        yp_add(source, YP_ADD_ALL);

327
        current_time = time(NULL);
328

329 330 331
        _add_yp_info(source, "last_touch", (void *)current_time, 
            YP_LAST_TOUCH);

332
        for (i=0;i<source->num_yp_directories;i++) {
333 334 335 336
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
            source->ypdata[i]->yp_last_touch = current_time - 
                source->ypdata[i]->yp_touch_interval + 5;
337
            /* Don't permit touch intervals of less than 30 seconds */
338 339 340 341
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
        }
342
    }
343
#endif
344

345 346
    DEBUG0("Source creation complete");

347
    while (global.running == ICE_RUNNING && source->running) {
348
#ifdef HAVE_CURL
349
        if(!suppress_yp) {
350
            current_time = time(NULL);
351 352
            for (i=0;i<source->num_yp_directories;i++) {
                if (current_time > (source->ypdata[i]->yp_last_touch + 
353 354
                            source->ypdata[i]->yp_touch_interval)) {
                    current_song[0] = 0;
355 356
                    if (stats_get_value(source->mount, "artist")) {
                        strncat(current_song, 
357 358
                                stats_get_value(source->mount, "artist"), 
                                sizeof(current_song) - 1);
359 360 361 362 363 364
                        if (strlen(current_song) + 4 < sizeof(current_song)) {
                            strncat(current_song, " - ", 3);
                        }
                    }
                    if (stats_get_value(source->mount, "title")) {
                        if (strlen(current_song) + 
365 366 367
                                strlen(stats_get_value(source->mount, "title"))
                                < sizeof(current_song) -1) 
                        {
368
                            strncat(current_song, 
369 370 371
                                    stats_get_value(source->mount, "title"), 
                                    sizeof(current_song) - 1 - 
                                    strlen(current_song));
372 373
                        }
                    }
374
                    
375 376 377 378 379 380
                    if (source->ypdata[i]->current_song) {
                        free(source->ypdata[i]->current_song);
                        source->ypdata[i]->current_song = NULL;
                    }
    
                    source->ypdata[i]->current_song = 
381
                        malloc(strlen(current_song) + 1);
382 383 384
                    strcpy(source->ypdata[i]->current_song, current_song);
    
                    thread_create("YP Touch Thread", yp_touch_thread, 
385
                            (void *)source, THREAD_DETACHED); 
386 387 388
                }
            }
        }
389
#endif
390
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
391 392 393 394
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
395
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
396 397 398
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
399 400
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

401
                if (ret <= 0) { /* timeout expired */
402 403
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
404 405 406
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
407

408 409
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
                if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) {
410 411
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
412
                    break;
413
                }
414 415
            }
            if (bytes <= 0) break;
416
            source->client->con->sent_bytes += bytes;
417
            ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf);
Michael Smith's avatar
Michael Smith committed
418 419 420 421
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
440

Michael Smith's avatar
Michael Smith committed
441 442 443 444 445 446 447 448 449 450 451 452
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

453 454
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
455

456 457 458 459
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
460

461 462 463
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
464

465 466 467 468 469 470 471
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
472

473
                sbytes = source->format->write_buf_to_client(source->format,
474
                        client, &abuf->data[client->pos], bytes);
475
                if (sbytes >= 0) {
476
                    if(sbytes != bytes) {
477 478 479
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
480 481 482 483 484 485
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
486 487 488
                else {
                    DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
                    client->con->error = 1;
489
                    data_done = 1;
490
                    refbuf_release(abuf);
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
507
                        client, refbuf->data, refbuf->len);
508
                if (sbytes >= 0) {
509 510
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
511
                        client->pos = sbytes;
512
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
513
                        refbuf_queue_insert(&client->queue, refbuf);
514 515
                    }
                }
516 517 518
                else {
                    DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
                    client->con->error = 1;
519 520 521 522 523 524 525 526
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
527
                DEBUG0("Client has fallen too far behind, removing");
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
                listeners--;
                stats_event_args(source->mount, "listeners", "%d", listeners);
                source->listeners = listeners;
555
                DEBUG0("Client removed");
556 557 558 559 560 561 562 563 564 565 566 567 568
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
            avl_insert(source->client_tree, client_node->key);
            listeners++;
569
            DEBUG0("Client added");
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
            stats_event_args(source->mount, "listeners", "%d", listeners);
            source->listeners = listeners;

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
            avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, source_remove_client);
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
597

Michael Smith's avatar
Michael Smith committed
598 599
done:

600
    DEBUG0("Source exiting");
601 602

#ifdef HAVE_CURL
603 604 605
    if(!suppress_yp) {
        yp_remove(source);
    }
606 607
#endif
    
Michael Smith's avatar
Michael Smith committed
608 609 610 611
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

612 613 614 615
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
616 617 618
    avl_tree_wlock(global.source_tree);
    avl_delete(global.source_tree, source, source_remove_source);
    avl_tree_unlock(global.source_tree);
619

620 621 622
    /* we need to empty the client and pending trees */
    avl_tree_wlock(source->pending_tree);
    while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
623 624 625
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
626
            avl_delete(source->pending_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
627

628
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
629 630 631 632 633
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
634
            avl_delete(source->pending_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
635
        }
636 637
    }
    avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
638

639 640
    avl_tree_wlock(source->client_tree);
    while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
641 642 643
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
644
            avl_delete(source->client_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
645

646
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
647 648 649 650 651
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
652
            avl_delete(source->client_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
653
        }
654 655
    }
    avl_tree_unlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
656

657 658 659
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
660

661 662 663
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
664

Michael Smith's avatar
Michael Smith committed
665 666 667
    if(source->dumpfile)
        fclose(source->dumpfile);

668 669
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
670

671 672
    source_free_source(source);

673
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
674
      
675
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
676 677 678 679
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
680 681 682 683 684
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
685

686 687
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
688

689
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
690 691
}

692
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
693
{
694
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
695 696 697 698
}

static int _free_client(void *key)
{
699 700 701 702 703 704 705 706 707 708
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
709
}
710

711 712
static int _parse_audio_info(source_t *source, char *s)
{
713 714 715 716
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
717 718 719 720 721

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
722
            strncpy(variable, token, pvar-token);    
723
            variable[pvar-token] = 0;
724 725
            pvar++;
            if (strlen(pvar)) {
726
                value = util_url_unescape(pvar);
727 728
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
729 730 731
                if (value) {
                    free(value);
                }
732 733
            }
            if (variable) {
734
                free(variable);
735 736 737 738 739 740 741
            }
        }
        s = NULL;
    }
    return 1;
}

742
#ifdef HAVE_CURL
743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type)
{
    int i;
    if (!info) {
        return;
    }
    for (i=0;i<source->num_yp_directories;i++) {
        switch (type) {
        case YP_SERVER_NAME:
        if (source->ypdata[i]->server_name) {
                free(source->ypdata[i]->server_name);
                }
                source->ypdata[i]->server_name = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_name, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_DESC:
                if (source->ypdata[i]->server_desc) {
                    free(source->ypdata[i]->server_desc);
                }
                source->ypdata[i]->server_desc = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_desc, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_GENRE:
                if (source->ypdata[i]->server_genre) {
                    free(source->ypdata[i]->server_genre);
                }
                source->ypdata[i]->server_genre = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_genre, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_URL:
                if (source->ypdata[i]->server_url) {
                    free(source->ypdata[i]->server_url);
                }
                source->ypdata[i]->server_url = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_url, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_BITRATE:
                if (source->ypdata[i]->bitrate) {
                    free(source->ypdata[i]->bitrate);
                }
                source->ypdata[i]->bitrate = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->bitrate, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_AUDIO_INFO:
                if (source->ypdata[i]->audio_info) {
                    free(source->ypdata[i]->audio_info);
                }
                source->ypdata[i]->audio_info = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->audio_info, (char *)info);
                break;
        case YP_SERVER_TYPE:
                if (source->ypdata[i]->server_type) {
                    free(source->ypdata[i]->server_type);
                }
                source->ypdata[i]->server_type = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_type, (char *)info);
                break;
        case YP_CURRENT_SONG:
                if (source->ypdata[i]->current_song) {
                    free(source->ypdata[i]->current_song);
                }
                source->ypdata[i]->current_song = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->current_song, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_URL_TIMEOUT:
                source->ypdata[i]->yp_url_timeout = (int)info;
                break;
        case YP_LAST_TOUCH:
                source->ypdata[i]->yp_last_touch = (int)info;
                break;
        case YP_TOUCH_INTERVAL:
                source->ypdata[i]->yp_touch_interval = (int)info;
                break;
        }
    }
}
834
#endif