source.c 26.1 KB
Newer Older
Jack Moffitt's avatar
Jack Moffitt committed
1 2 3 4
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
5
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
6
#include <errno.h>
7 8

#ifndef _WIN32
9
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
10 11
#include <sys/time.h>
#include <sys/socket.h>
12
#else
13 14
#include <winsock2.h>
#include <windows.h>
15
#endif
Jack Moffitt's avatar
Jack Moffitt committed
16 17 18 19 20 21 22 23 24 25 26

#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
27
#include "log.h"
28
#include "logging.h"
29
#include "config.h"
30
#include "util.h"
31
#include "geturl.h"
Jack Moffitt's avatar
Jack Moffitt committed
32
#include "source.h"
Michael Smith's avatar
Michael Smith committed
33
#include "format.h"
Jack Moffitt's avatar
Jack Moffitt committed
34

35 36 37
#undef CATMODULE
#define CATMODULE "source"

38 39 40 41 42 43 44 45 46 47 48 49
#define  YP_SERVER_NAME 1
#define  YP_SERVER_DESC 2
#define  YP_SERVER_GENRE 3
#define  YP_SERVER_URL 4
#define  YP_BITRATE 5
#define  YP_AUDIO_INFO 6
#define  YP_SERVER_TYPE 7
#define  YP_CURRENT_SONG 8
#define  YP_URL_TIMEOUT 9
#define  YP_TOUCH_INTERVAL 10
#define  YP_LAST_TOUCH 11

Jack Moffitt's avatar
Jack Moffitt committed
50 51 52
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
53 54 55
static int _parse_audio_info(source_t *source, char *s);
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type);
Jack Moffitt's avatar
Jack Moffitt committed
56

57
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
58 59
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
60
{
61
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
62

63
    src = (source_t *)malloc(sizeof(source_t));
64
    src->client = client;
65
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
66
    src->fallback_mount = NULL;
67 68 69 70 71
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
72
    src->running = 1;
73 74
    src->num_yp_directories = 0;
    src->listeners = 0;
75
    src->max_listeners = -1;
76
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
77 78
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
79
    src->audio_info = util_dict_new();
Jack Moffitt's avatar
Jack Moffitt committed
80

Michael Smith's avatar
Michael Smith committed
81 82 83 84 85 86 87 88 89 90 91 92 93 94
    if(mountinfo != NULL) {
        src->fallback_mount = mountinfo->fallback_mount;
        src->max_listeners = mountinfo->max_listeners;
        src->dumpfilename = mountinfo->dumpfile;
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

95
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
96 97
}

98 99 100 101 102
static int source_remove_source(void *key)
{
    return 1;
}

Jack Moffitt's avatar
Jack Moffitt committed
103 104 105 106 107
/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
131 132 133 134
}

int source_compare_sources(void *arg, void *a, void *b)
{
135 136
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
137

138
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
139 140 141 142
}

int source_free_source(void *key)
{
143
    source_t *source = key;
144
    int i=0;
Jack Moffitt's avatar
Jack Moffitt committed
145

146
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
147
    free(source->fallback_mount);
148
    client_destroy(source->client);
149 150 151
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
152 153 154
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
155
    util_dict_free(source->audio_info);
156
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
157

158
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
159
}
160
    
Jack Moffitt's avatar
Jack Moffitt committed
161 162 163

void *source_main(void *arg)
{
164
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
165
    source_t *fallback_source;
166 167 168 169 170 171 172 173 174 175 176
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;
    char *s;
    long current_time;
    char current_song[256];

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
177

178
    int listeners = 0;
179 180
    int    i=0;
    int    suppress_yp = 0;
181
    char *ai;
Jack Moffitt's avatar
Jack Moffitt committed
182

Michael Smith's avatar
Michael Smith committed
183 184 185 186 187 188 189 190
    long queue_limit;
    ice_config_t *config;
    char *hostname;
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
191
    timeout = config->source_timeout;
Michael Smith's avatar
Michael Smith committed
192 193 194
    hostname = config->hostname;
    port = config->port;

195 196 197 198
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
Michael Smith's avatar
Michael Smith committed
199
                config->yp_url[i];
200
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
201
                config->yp_url_timeout[i];
202 203 204 205
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
206

Michael Smith's avatar
Michael Smith committed
207
    config_release_config();
208

209 210
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
211

212 213 214 215 216 217 218 219 220 221 222 223
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
    if (source_find_mount(source->mount) != NULL) {
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
        thread_exit(0);
        return NULL;
    }
224 225 226 227
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
228

229 230 231 232 233 234 235 236 237
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
238

239 240 241 242
    /* start off the statistics */
    stats_event(source->mount, "listeners", "0");
    source->listeners = 0;
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
243
        _add_yp_info(source, "server_name", s, YP_SERVER_NAME);
244 245
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
246
        _add_yp_info(source, "server_url", s, YP_SERVER_URL);
247 248
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
249
        _add_yp_info(source, "genre", s, YP_SERVER_GENRE);
250 251
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
252
        _add_yp_info(source, "bitrate", s, YP_BITRATE);
253 254
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
255
        _add_yp_info(source, "server_description", s, YP_SERVER_DESC);
256 257 258 259 260 261
    }
    if ((s = httpp_getvar(source->parser, "ice-private"))) {
        stats_event(source->mount, "public", s);
        suppress_yp = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
262 263
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
264
            _add_yp_info(source, "audio_info", 
265 266 267
                    ai,
                    YP_AUDIO_INFO);
        }
268 269 270 271 272 273
    }
    for (i=0;i<source->num_yp_directories;i++) {
        if (source->ypdata[i]->server_type) {
            free(source->ypdata[i]->server_type);
        }
        source->ypdata[i]->server_type = malloc(
274
                strlen(source->format->format_description) + 1);
275
        strcpy(source->ypdata[i]->server_type, 
276
                source->format->format_description);
277
    }
278
    stats_event(source->mount, "type", source->format->format_description);
Jack Moffitt's avatar
Jack Moffitt committed
279

280
    for (i=0;i<source->num_yp_directories;i++) {
281
        int listen_url_size;
282 283 284 285 286
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
287
            strlen(hostname) + 
288
            strlen(":") + 6 + strlen(source->mount) + 1;
289 290
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
291
                hostname, port, source->mount);
292
    }
293

294
    if(!suppress_yp) {
295 296
        yp_add(source, YP_ADD_ALL);

297
        current_time = time(NULL);
298

299 300 301
        _add_yp_info(source, "last_touch", (void *)current_time, 
            YP_LAST_TOUCH);

302
        for (i=0;i<source->num_yp_directories;i++) {
303 304 305 306
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
            source->ypdata[i]->yp_last_touch = current_time - 
                source->ypdata[i]->yp_touch_interval + 5;
307
            /* Don't permit touch intervals of less than 30 seconds */
308 309 310 311
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
        }
312 313
    }

314 315
    DEBUG0("Source creation complete");

316 317
    while (global.running == ICE_RUNNING && source->running) {
        if(!suppress_yp) {
318
            current_time = time(NULL);
319 320
            for (i=0;i<source->num_yp_directories;i++) {
                if (current_time > (source->ypdata[i]->yp_last_touch + 
321 322
                            source->ypdata[i]->yp_touch_interval)) {
                    current_song[0] = 0;
323 324
                    if (stats_get_value(source->mount, "artist")) {
                        strncat(current_song, 
325 326
                                stats_get_value(source->mount, "artist"), 
                                sizeof(current_song) - 1);
327 328 329 330 331 332
                        if (strlen(current_song) + 4 < sizeof(current_song)) {
                            strncat(current_song, " - ", 3);
                        }
                    }
                    if (stats_get_value(source->mount, "title")) {
                        if (strlen(current_song) + 
333 334 335
                                strlen(stats_get_value(source->mount, "title"))
                                < sizeof(current_song) -1) 
                        {
336
                            strncat(current_song, 
337 338 339
                                    stats_get_value(source->mount, "title"), 
                                    sizeof(current_song) - 1 - 
                                    strlen(current_song));
340 341
                        }
                    }
342
                    
343 344 345 346 347 348
                    if (source->ypdata[i]->current_song) {
                        free(source->ypdata[i]->current_song);
                        source->ypdata[i]->current_song = NULL;
                    }
    
                    source->ypdata[i]->current_song = 
349
                        malloc(strlen(current_song) + 1);
350 351 352
                    strcpy(source->ypdata[i]->current_song, current_song);
    
                    thread_create("YP Touch Thread", yp_touch_thread, 
353
                            (void *)source, THREAD_DETACHED); 
354 355 356
                }
            }
        }
357

358
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
359 360 361 362
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
363
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
364 365 366
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
367 368
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

369
                if (ret <= 0) { /* timeout expired */
370 371
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
372 373 374
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
375

376 377
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
                if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) {
378 379
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
380
                    break;
381
                }
382 383
            }
            if (bytes <= 0) break;
384
            source->client->con->sent_bytes += bytes;
385
            ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf);
Michael Smith's avatar
Michael Smith committed
386 387 388 389
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
408

Michael Smith's avatar
Michael Smith committed
409 410 411 412 413 414 415 416 417 418 419 420
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

421 422
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
423

424 425 426 427
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
428

429 430 431
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
432

433 434 435 436 437 438 439
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
440

441
                sbytes = source->format->write_buf_to_client(source->format,
442
                        client, &abuf->data[client->pos], bytes);
443
                if (sbytes >= 0) {
444
                    if(sbytes != bytes) {
445 446 447
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
448 449 450 451 452 453
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
454 455 456
                else {
                    DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
                    client->con->error = 1;
457
                    data_done = 1;
458
                    refbuf_release(abuf);
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
475
                        client, refbuf->data, refbuf->len);
476
                if (sbytes >= 0) {
477 478
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
479
                        client->pos = sbytes;
480
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
481
                        refbuf_queue_insert(&client->queue, refbuf);
482 483
                    }
                }
484 485 486
                else {
                    DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
                    client->con->error = 1;
487 488 489 490 491 492 493 494
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
495
                DEBUG0("Client has fallen too far behind, removing");
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
                listeners--;
                stats_event_args(source->mount, "listeners", "%d", listeners);
                source->listeners = listeners;
523
                DEBUG0("Client removed");
524 525 526 527 528 529 530 531 532 533 534 535 536
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
            avl_insert(source->client_tree, client_node->key);
            listeners++;
537
            DEBUG0("Client added");
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
            stats_event_args(source->mount, "listeners", "%d", listeners);
            source->listeners = listeners;

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
            avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, source_remove_client);
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
565

Michael Smith's avatar
Michael Smith committed
566 567
done:

568
    DEBUG0("Source exiting");
569 570 571
    if(!suppress_yp) {
        yp_remove(source);
    }
Jack Moffitt's avatar
Jack Moffitt committed
572

Michael Smith's avatar
Michael Smith committed
573 574 575 576
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

577 578 579 580
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
581 582 583
    avl_tree_wlock(global.source_tree);
    avl_delete(global.source_tree, source, source_remove_source);
    avl_tree_unlock(global.source_tree);
584

585 586 587
    /* we need to empty the client and pending trees */
    avl_tree_wlock(source->pending_tree);
    while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
588 589 590
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
591
            avl_delete(source->pending_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
592

593
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
594 595 596 597 598
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
599
            avl_delete(source->pending_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
600
        }
601 602
    }
    avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
603

604 605
    avl_tree_wlock(source->client_tree);
    while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
606 607 608
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
609
            avl_delete(source->client_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
610

611
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
612 613 614 615 616
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
617
            avl_delete(source->client_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
618
        }
619 620
    }
    avl_tree_unlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
621

622 623 624
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
625

626 627 628
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
629

Michael Smith's avatar
Michael Smith committed
630 631 632
    if(source->dumpfile)
        fclose(source->dumpfile);

633 634
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
635

636 637
    source_free_source(source);

638
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
639
      
640
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
641 642 643 644
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
645
    connection_t *cona = (connection_t *)a;
646
    connection_t *conb = (connection_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
647

648 649
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
650

651
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
652 653
}

654
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
655
{
656
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
657 658 659 660
}

static int _free_client(void *key)
{
661 662 663 664 665 666 667 668 669 670
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
671
}
672

673 674
static int _parse_audio_info(source_t *source, char *s)
{
675 676 677 678
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
679 680 681 682 683

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
684
            strncpy(variable, token, pvar-token);    
685
            variable[pvar-token] = 0;
686 687
            pvar++;
            if (strlen(pvar)) {
688
                value = util_url_unescape(pvar);
689 690
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
691 692 693
                if (value) {
                    free(value);
                }
694 695
            }
            if (variable) {
696
                free(variable);
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794
            }
        }
        s = NULL;
    }
    return 1;
}

static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type)
{
    int i;
    if (!info) {
        return;
    }
    for (i=0;i<source->num_yp_directories;i++) {
        switch (type) {
        case YP_SERVER_NAME:
        if (source->ypdata[i]->server_name) {
                free(source->ypdata[i]->server_name);
                }
                source->ypdata[i]->server_name = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_name, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_DESC:
                if (source->ypdata[i]->server_desc) {
                    free(source->ypdata[i]->server_desc);
                }
                source->ypdata[i]->server_desc = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_desc, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_GENRE:
                if (source->ypdata[i]->server_genre) {
                    free(source->ypdata[i]->server_genre);
                }
                source->ypdata[i]->server_genre = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_genre, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_URL:
                if (source->ypdata[i]->server_url) {
                    free(source->ypdata[i]->server_url);
                }
                source->ypdata[i]->server_url = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_url, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_BITRATE:
                if (source->ypdata[i]->bitrate) {
                    free(source->ypdata[i]->bitrate);
                }
                source->ypdata[i]->bitrate = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->bitrate, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_AUDIO_INFO:
                if (source->ypdata[i]->audio_info) {
                    free(source->ypdata[i]->audio_info);
                }
                source->ypdata[i]->audio_info = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->audio_info, (char *)info);
                break;
        case YP_SERVER_TYPE:
                if (source->ypdata[i]->server_type) {
                    free(source->ypdata[i]->server_type);
                }
                source->ypdata[i]->server_type = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_type, (char *)info);
                break;
        case YP_CURRENT_SONG:
                if (source->ypdata[i]->current_song) {
                    free(source->ypdata[i]->current_song);
                }
                source->ypdata[i]->current_song = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->current_song, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_URL_TIMEOUT:
                source->ypdata[i]->yp_url_timeout = (int)info;
                break;
        case YP_LAST_TOUCH:
                source->ypdata[i]->yp_last_touch = (int)info;
                break;
        case YP_TOUCH_INTERVAL:
                source->ypdata[i]->yp_touch_interval = (int)info;
                break;
        }
    }
}