source.c 27.5 KB
Newer Older
1
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
Jack Moffitt's avatar
Jack Moffitt committed
2 3 4 5
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
6
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
7
#include <errno.h>
8 9

#ifndef _WIN32
10
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
11 12
#include <sys/time.h>
#include <sys/socket.h>
13
#else
14 15
#include <winsock2.h>
#include <windows.h>
16
#endif
Jack Moffitt's avatar
Jack Moffitt committed
17 18 19 20 21 22 23 24 25 26 27

#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
28
#include "log.h"
29
#include "logging.h"
30
#include "config.h"
31
#include "util.h"
brendan's avatar
brendan committed
32
#ifdef USE_YP
33
#include "geturl.h"
34
#endif
Jack Moffitt's avatar
Jack Moffitt committed
35
#include "source.h"
Michael Smith's avatar
Michael Smith committed
36
#include "format.h"
Jack Moffitt's avatar
Jack Moffitt committed
37

38 39 40
#undef CATMODULE
#define CATMODULE "source"

41 42 43 44 45 46 47 48 49 50 51 52
#define  YP_SERVER_NAME 1
#define  YP_SERVER_DESC 2
#define  YP_SERVER_GENRE 3
#define  YP_SERVER_URL 4
#define  YP_BITRATE 5
#define  YP_AUDIO_INFO 6
#define  YP_SERVER_TYPE 7
#define  YP_CURRENT_SONG 8
#define  YP_URL_TIMEOUT 9
#define  YP_TOUCH_INTERVAL 10
#define  YP_LAST_TOUCH 11

Jack Moffitt's avatar
Jack Moffitt committed
53 54 55
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
56
static int _parse_audio_info(source_t *source, char *s);
brendan's avatar
brendan committed
57
#ifdef USE_YP
58 59
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type);
60
#endif
Jack Moffitt's avatar
Jack Moffitt committed
61

62
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
63 64
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
65
{
66
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
67

68
    src = (source_t *)malloc(sizeof(source_t));
69
    src->client = client;
70
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
71
    src->fallback_mount = NULL;
72 73 74 75 76
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
77
    src->running = 1;
78 79
    src->num_yp_directories = 0;
    src->listeners = 0;
80
    src->max_listeners = -1;
81
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
82 83
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
84
    src->audio_info = util_dict_new();
Jack Moffitt's avatar
Jack Moffitt committed
85

Michael Smith's avatar
Michael Smith committed
86 87 88 89 90 91 92 93 94 95 96 97 98 99
    if(mountinfo != NULL) {
        src->fallback_mount = mountinfo->fallback_mount;
        src->max_listeners = mountinfo->max_listeners;
        src->dumpfilename = mountinfo->dumpfile;
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

100
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
101 102
}

103 104 105 106 107
static int source_remove_source(void *key)
{
    return 1;
}

Jack Moffitt's avatar
Jack Moffitt committed
108 109 110 111 112
/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
136 137 138 139
}

int source_compare_sources(void *arg, void *a, void *b)
{
140 141
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
142

143
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
144 145 146 147
}

int source_free_source(void *key)
{
148
    source_t *source = key;
brendan's avatar
brendan committed
149 150 151
#ifdef USE_YP
    int i;
#endif
Jack Moffitt's avatar
Jack Moffitt committed
152

153
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
154
    free(source->fallback_mount);
155
    client_destroy(source->client);
156 157 158
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
brendan's avatar
brendan committed
159
#ifdef USE_YP
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
160 161 162
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
163
#endif
164
    util_dict_free(source->audio_info);
165
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
166

167
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
168
}
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188

client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
    client_t *result;
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
    if(avl_get_by_key(source->client_tree, &fakeclient, (void **)&result) == 0)
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
189
    
Jack Moffitt's avatar
Jack Moffitt committed
190 191 192

void *source_main(void *arg)
{
193
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
194
    source_t *fallback_source;
195 196 197 198 199 200 201 202
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
203

204
    int listeners = 0;
brendan's avatar
brendan committed
205 206 207 208 209
#ifdef USE_YP
    char *s;
    long current_time;
    char current_song[256];
    int    i;
210
    int    suppress_yp = 0;
211
    char *ai;
brendan's avatar
brendan committed
212
#endif
Jack Moffitt's avatar
Jack Moffitt committed
213

Michael Smith's avatar
Michael Smith committed
214 215 216 217 218 219 220 221
    long queue_limit;
    ice_config_t *config;
    char *hostname;
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
222
    timeout = config->source_timeout;
Michael Smith's avatar
Michael Smith committed
223 224 225
    hostname = config->hostname;
    port = config->port;

brendan's avatar
brendan committed
226
#ifdef USE_YP
227 228 229 230
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
Michael Smith's avatar
Michael Smith committed
231
                config->yp_url[i];
232
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
233
                config->yp_url_timeout[i];
234 235 236 237
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
238 239
#endif
    
Michael Smith's avatar
Michael Smith committed
240
    config_release_config();
241

242 243
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
244

245 246 247 248 249 250 251 252 253 254 255 256
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
    if (source_find_mount(source->mount) != NULL) {
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
        thread_exit(0);
        return NULL;
    }
257 258 259 260
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
261

262 263 264 265 266 267 268 269 270
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
271

272 273
    /* start off the statistics */
    source->listeners = 0;
274 275
    stats_event(source->mount, "listeners", "0");
    stats_event(source->mount, "type", source->format->format_description);
brendan's avatar
brendan committed
276
#ifdef USE_YP
277
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
278
        _add_yp_info(source, "server_name", s, YP_SERVER_NAME);
279 280
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
281
        _add_yp_info(source, "server_url", s, YP_SERVER_URL);
282 283
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
284
        _add_yp_info(source, "genre", s, YP_SERVER_GENRE);
285 286
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
287
        _add_yp_info(source, "bitrate", s, YP_BITRATE);
288 289
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
290
        _add_yp_info(source, "server_description", s, YP_SERVER_DESC);
291 292 293 294 295 296
    }
    if ((s = httpp_getvar(source->parser, "ice-private"))) {
        stats_event(source->mount, "public", s);
        suppress_yp = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
297 298
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
299
            _add_yp_info(source, "audio_info", 
300 301 302
                    ai,
                    YP_AUDIO_INFO);
        }
303 304
    }
    for (i=0;i<source->num_yp_directories;i++) {
305 306 307
        _add_yp_info(source, "server_type", 
                     source->format->format_description,
                     YP_SERVER_TYPE);
308
    }
Jack Moffitt's avatar
Jack Moffitt committed
309

310
    for (i=0;i<source->num_yp_directories;i++) {
311
        int listen_url_size;
312 313 314 315 316
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
317
            strlen(hostname) + 
318
            strlen(":") + 6 + strlen(source->mount) + 1;
319 320
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
321
                hostname, port, source->mount);
322
    }
323

324
    if(!suppress_yp) {
325 326
        yp_add(source, YP_ADD_ALL);

327
        current_time = time(NULL);
328

329 330 331
        _add_yp_info(source, "last_touch", (void *)current_time, 
            YP_LAST_TOUCH);

332
        for (i=0;i<source->num_yp_directories;i++) {
333 334 335 336
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
            source->ypdata[i]->yp_last_touch = current_time - 
                source->ypdata[i]->yp_touch_interval + 5;
337
            /* Don't permit touch intervals of less than 30 seconds */
338 339 340 341
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
        }
342
    }
343
#endif
344

345 346
    DEBUG0("Source creation complete");

347
    while (global.running == ICE_RUNNING && source->running) {
brendan's avatar
brendan committed
348
#ifdef USE_YP
349
        if(!suppress_yp) {
350
            current_time = time(NULL);
351 352
            for (i=0;i<source->num_yp_directories;i++) {
                if (current_time > (source->ypdata[i]->yp_last_touch + 
353 354
                            source->ypdata[i]->yp_touch_interval)) {
                    current_song[0] = 0;
355 356
                    if ((s = stats_get_value(source->mount, "artist"))) {
                        strncat(current_song, s, 
357
                                sizeof(current_song) - 1);
358 359 360
                        if (strlen(current_song) + 4 < 
                                sizeof(current_song)) 
                        {
361 362 363
                            strncat(current_song, " - ", 3);
                        }
                    }
brendan's avatar
brendan committed
364
                    if ((s = stats_get_value(source->mount, "title"))) {
365
                        if (strlen(current_song) + strlen(s)
366 367
                                < sizeof(current_song) -1) 
                        {
368
                            strncat(current_song, 
369
                                    s, 
370 371
                                    sizeof(current_song) - 1 - 
                                    strlen(current_song));
372 373
                        }
                    }
374
                    _add_yp_info(source, "current_song", current_song, YP_CURRENT_SONG);
375
                    thread_create("YP Touch Thread", yp_touch_thread, 
376
                            (void *)source, THREAD_DETACHED); 
377 378 379
                }
            }
        }
380
#endif
381
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
382 383 384 385
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
386
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
387 388 389
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
390 391
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

392
                if (ret <= 0) { /* timeout expired */
393 394
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
395 396 397
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
398

399 400
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
                if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) {
401 402
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
403
                    break;
404
                }
405 406
            }
            if (bytes <= 0) break;
407
            source->client->con->sent_bytes += bytes;
408
            ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf);
Michael Smith's avatar
Michael Smith committed
409 410 411 412
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
431

Michael Smith's avatar
Michael Smith committed
432 433 434 435 436 437 438 439 440 441 442 443
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

444 445
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
446

447 448 449 450
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
451

452 453 454
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
455

456 457 458 459 460 461 462
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
463

464
                sbytes = source->format->write_buf_to_client(source->format,
465
                        client, &abuf->data[client->pos], bytes);
466
                if (sbytes >= 0) {
467
                    if(sbytes != bytes) {
468 469 470
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
471 472 473 474 475 476
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
477 478 479
                else {
                    DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
                    client->con->error = 1;
480
                    data_done = 1;
481
                    refbuf_release(abuf);
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
498
                        client, refbuf->data, refbuf->len);
499
                if (sbytes >= 0) {
500 501
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
502
                        client->pos = sbytes;
503
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
504
                        refbuf_queue_insert(&client->queue, refbuf);
505 506
                    }
                }
507 508 509
                else {
                    DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
                    client->con->error = 1;
510 511 512 513 514 515 516 517
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
518
                DEBUG0("Client has fallen too far behind, removing");
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
                listeners--;
                stats_event_args(source->mount, "listeners", "%d", listeners);
                source->listeners = listeners;
546
                DEBUG0("Client removed");
547 548 549 550 551 552 553 554 555 556 557 558
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
            avl_insert(source->client_tree, client_node->key);
559 560
            /* listener count may have changed */
            listeners = source->listeners;
561
            listeners++;
562
            DEBUG0("Client added");
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
            stats_event_args(source->mount, "listeners", "%d", listeners);
            source->listeners = listeners;

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
            avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, source_remove_client);
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
590

Michael Smith's avatar
Michael Smith committed
591 592
done:

593
    INFO1("Source \"%s\" exiting", source->mount);
594

brendan's avatar
brendan committed
595
#ifdef USE_YP
596 597 598
    if(!suppress_yp) {
        yp_remove(source);
    }
599 600
#endif
    
Michael Smith's avatar
Michael Smith committed
601 602 603 604
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

605 606 607 608
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
609 610 611
    avl_tree_wlock(global.source_tree);
    avl_delete(global.source_tree, source, source_remove_source);
    avl_tree_unlock(global.source_tree);
612

613 614 615
    /* we need to empty the client and pending trees */
    avl_tree_wlock(source->pending_tree);
    while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
616 617 618
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
619
            avl_delete(source->pending_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
620

621
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
622 623 624 625 626
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
627
            avl_delete(source->pending_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
628
        }
629 630
    }
    avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
631

632 633
    avl_tree_wlock(source->client_tree);
    while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
634 635 636
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
637
            avl_delete(source->client_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
638

639
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
640 641 642 643 644
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
645
            avl_delete(source->client_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
646
        }
647 648
    }
    avl_tree_unlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
649

650 651 652
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
653

654 655 656
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
657

Michael Smith's avatar
Michael Smith committed
658 659 660
    if(source->dumpfile)
        fclose(source->dumpfile);

661 662
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
663

664 665
    source_free_source(source);

666
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
667
      
668
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
669 670 671 672
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
673 674 675 676 677
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
678

679 680
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
681

682
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
683 684
}

685
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
686
{
687
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
688 689 690 691
}

static int _free_client(void *key)
{
692 693 694 695 696 697 698 699 700 701
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
702
}
703

704 705
static int _parse_audio_info(source_t *source, char *s)
{
706 707 708 709
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
710 711 712 713 714

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
715
            strncpy(variable, token, pvar-token);    
716
            variable[pvar-token] = 0;
717 718
            pvar++;
            if (strlen(pvar)) {
719
                value = util_url_unescape(pvar);
720 721
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
722 723 724
                if (value) {
                    free(value);
                }
725 726
            }
            if (variable) {
727
                free(variable);
728 729 730 731 732 733 734
            }
        }
        s = NULL;
    }
    return 1;
}

brendan's avatar
brendan committed
735
#ifdef USE_YP
736 737 738
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type)
{
739
    char *escaped;
740 741 742 743 744 745 746
    int i;
    if (!info) {
        return;
    }
    for (i=0;i<source->num_yp_directories;i++) {
        switch (type) {
        case YP_SERVER_NAME:
747 748 749 750 751 752 753 754 755 756
                escaped = util_url_escape(info);
                if (escaped) {
                    if (source->ypdata[i]->server_name) {
                        free(source->ypdata[i]->server_name);
                    }
                    source->ypdata[i]->server_name = 
                         malloc(strlen((char *)escaped) +1);
                    strcpy(source->ypdata[i]->server_name, (char *)escaped);
                    stats_event(source->mount, stat_name, (char *)info);
                    free(escaped);
757 758 759
                }
                break;
        case YP_SERVER_DESC:
760 761 762 763 764 765 766 767 768 769
                escaped = util_url_escape(info);
                if (escaped) {
                    if (source->ypdata[i]->server_desc) {
                        free(source->ypdata[i]->server_desc);
                    }
                    source->ypdata[i]->server_desc = 
                        malloc(strlen((char *)escaped) +1);
                    strcpy(source->ypdata[i]->server_desc, (char *)escaped);
                    stats_event(source->mount, stat_name, (char *)info);
                    free(escaped);
770 771 772
                }
                break;
        case YP_SERVER_GENRE:
773 774 775 776 777 778 779 780 781 782
                escaped = util_url_escape(info);
                if (escaped) {
                    if (source->ypdata[i]->server_genre) {
                        free(source->ypdata[i]->server_genre);
                    }
                    source->ypdata[i]->server_genre = 
                        malloc(strlen((char *)escaped) +1);
                    strcpy(source->ypdata[i]->server_genre, (char *)escaped);
                    stats_event(source->mount, stat_name, (char *)info);
                    free(escaped);
783 784 785
                }
                break;
        case YP_SERVER_URL:
786 787 788 789 790 791 792 793 794 795
                escaped = util_url_escape(info);
                if (escaped) {
                    if (source->ypdata[i]->server_url) {
                        free(source->ypdata[i]->server_url);
                    }
                    source->ypdata[i]->server_url = 
                        malloc(strlen((char *)escaped) +1);
                    strcpy(source->ypdata[i]->server_url, (char *)escaped);
                    stats_event(source->mount, stat_name, (char *)info);
                    free(escaped);
796 797 798
                }
                break;
        case YP_BITRATE:
799 800 801 802 803 804 805 806 807 808
                escaped = util_url_escape(info);
                if (escaped) {
                    if (source->ypdata[i]->bitrate) {
                        free(source->ypdata[i]->bitrate);
                    }
                    source->ypdata[i]->bitrate = 
                        malloc(strlen((char *)escaped) +1);
                    strcpy(source->ypdata[i]->bitrate, (char *)escaped);
                    stats_event(source->mount, stat_name, (char *)info);
                    free(escaped);
809 810 811 812 813 814 815 816 817 818 819
                }
                break;
        case YP_AUDIO_INFO:
                if (source->ypdata[i]->audio_info) {
                    free(source->ypdata[i]->audio_info);
                }
                source->ypdata[i]->audio_info = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->audio_info, (char *)info);
                break;
        case YP_SERVER_TYPE:
820 821 822 823 824 825 826 827 828
                escaped = util_url_escape(info);
                if (escaped) {
                    if (source->ypdata[i]->server_type) {
                        free(source->ypdata[i]->server_type);
                    }
                    source->ypdata[i]->server_type = 
                        malloc(strlen((char *)escaped) +1);
                    strcpy(source->ypdata[i]->server_type, (char *)escaped);
                    free(escaped);
829 830 831
                }
                break;
        case YP_CURRENT_SONG:
832 833 834 835 836 837 838 839
                escaped = util_url_escape(info);
                if (escaped) {
                    if (source->ypdata[i]->current_song) {
                        free(source->ypdata[i]->current_song);
                    }
                    source->ypdata[i]->current_song = 
                        malloc(strlen((char *)escaped) +1);
                    strcpy(source->ypdata[i]->current_song, (char *)escaped);
840
                    stats_event(source->mount, "yp_currently_playing", (char *)info);
841
                    free(escaped);
842 843 844 845 846 847 848 849 850 851 852 853 854 855
                }
                break;
        case YP_URL_TIMEOUT:
                source->ypdata[i]->yp_url_timeout = (int)info;
                break;
        case YP_LAST_TOUCH:
                source->ypdata[i]->yp_last_touch = (int)info;
                break;
        case YP_TOUCH_INTERVAL:
                source->ypdata[i]->yp_touch_interval = (int)info;
                break;
        }
    }
}
856
#endif