source.c 26.1 KB
Newer Older
Jack Moffitt's avatar
Jack Moffitt committed
1 2 3 4
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
5
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
6
#include <errno.h>
7 8

#ifndef _WIN32
9
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
10 11
#include <sys/time.h>
#include <sys/socket.h>
12
#else
13 14
#include <winsock2.h>
#include <windows.h>
15
#endif
Jack Moffitt's avatar
Jack Moffitt committed
16 17 18 19 20 21 22 23 24 25 26

#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
27
#include "log.h"
28
#include "logging.h"
29
#include "config.h"
30
#include "util.h"
31
#include "geturl.h"
Jack Moffitt's avatar
Jack Moffitt committed
32
#include "source.h"
Michael Smith's avatar
Michael Smith committed
33
#include "format.h"
Jack Moffitt's avatar
Jack Moffitt committed
34

35 36 37
#undef CATMODULE
#define CATMODULE "source"

38 39 40 41 42 43 44 45 46 47 48 49
#define  YP_SERVER_NAME 1
#define  YP_SERVER_DESC 2
#define  YP_SERVER_GENRE 3
#define  YP_SERVER_URL 4
#define  YP_BITRATE 5
#define  YP_AUDIO_INFO 6
#define  YP_SERVER_TYPE 7
#define  YP_CURRENT_SONG 8
#define  YP_URL_TIMEOUT 9
#define  YP_TOUCH_INTERVAL 10
#define  YP_LAST_TOUCH 11

Jack Moffitt's avatar
Jack Moffitt committed
50 51 52
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
53 54 55
static int _parse_audio_info(source_t *source, char *s);
static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type);
Jack Moffitt's avatar
Jack Moffitt committed
56

57
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
58 59
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
60
{
61
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
62

63
    src = (source_t *)malloc(sizeof(source_t));
64
    src->client = client;
65
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
66
    src->fallback_mount = NULL;
67 68 69 70 71
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
72
    src->running = 1;
73 74
    src->num_yp_directories = 0;
    src->listeners = 0;
75
    src->max_listeners = -1;
76
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
77 78
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
79
    src->audio_info = util_dict_new();
Jack Moffitt's avatar
Jack Moffitt committed
80

Michael Smith's avatar
Michael Smith committed
81 82 83 84 85 86 87 88 89 90 91 92 93 94
    if(mountinfo != NULL) {
        src->fallback_mount = mountinfo->fallback_mount;
        src->max_listeners = mountinfo->max_listeners;
        src->dumpfilename = mountinfo->dumpfile;
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

95
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
96 97
}

98 99 100 101 102
static int source_remove_source(void *key)
{
    return 1;
}

Jack Moffitt's avatar
Jack Moffitt committed
103 104 105 106 107
/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
131 132 133 134
}

int source_compare_sources(void *arg, void *a, void *b)
{
135 136
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
137

138
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
139 140 141 142
}

int source_free_source(void *key)
{
143
    source_t *source = key;
144
    int i=0;
Jack Moffitt's avatar
Jack Moffitt committed
145

146
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
147
    free(source->fallback_mount);
148
    client_destroy(source->client);
149 150 151
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
152 153 154
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
155
    util_dict_free(source->audio_info);
156
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
157

158
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
159
}
160
    
Jack Moffitt's avatar
Jack Moffitt committed
161

162 163 164
/* The caller MUST have a current write lock on global.source_tree when calling
 * this
 */
Jack Moffitt's avatar
Jack Moffitt committed
165 166
void *source_main(void *arg)
{
167
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
168
    source_t *fallback_source;
169 170 171 172 173 174 175 176 177 178 179
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;
    char *s;
    long current_time;
    char current_song[256];

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
180

181
    int listeners = 0;
182 183
    int    i=0;
    int    suppress_yp = 0;
184
    char *ai;
Jack Moffitt's avatar
Jack Moffitt committed
185

Michael Smith's avatar
Michael Smith committed
186 187 188 189 190 191 192 193
    long queue_limit;
    ice_config_t *config;
    char *hostname;
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
194
    timeout = config->source_timeout;
Michael Smith's avatar
Michael Smith committed
195 196 197
    hostname = config->hostname;
    port = config->port;

198 199 200 201
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
Michael Smith's avatar
Michael Smith committed
202
                config->yp_url[i];
203
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
204
                config->yp_url_timeout[i];
205 206 207 208
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
209

Michael Smith's avatar
Michael Smith committed
210
    config_release_config();
211

212 213
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
214

215 216 217 218 219 220 221 222 223 224 225 226
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
    if (source_find_mount(source->mount) != NULL) {
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
        thread_exit(0);
        return NULL;
    }
227 228 229 230
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
231

232 233 234 235 236 237 238 239 240
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
241

242 243 244 245
    /* start off the statistics */
    stats_event(source->mount, "listeners", "0");
    source->listeners = 0;
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
246
        _add_yp_info(source, "server_name", s, YP_SERVER_NAME);
247 248
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
249
        _add_yp_info(source, "server_url", s, YP_SERVER_URL);
250 251
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
252
        _add_yp_info(source, "genre", s, YP_SERVER_GENRE);
253 254
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
255
        _add_yp_info(source, "bitrate", s, YP_BITRATE);
256 257
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
258
        _add_yp_info(source, "server_description", s, YP_SERVER_DESC);
259 260 261 262 263 264
    }
    if ((s = httpp_getvar(source->parser, "ice-private"))) {
        stats_event(source->mount, "public", s);
        suppress_yp = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
265 266
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
267
            _add_yp_info(source, "audio_info", 
268 269 270
                    ai,
                    YP_AUDIO_INFO);
        }
271 272 273 274 275 276
    }
    for (i=0;i<source->num_yp_directories;i++) {
        if (source->ypdata[i]->server_type) {
            free(source->ypdata[i]->server_type);
        }
        source->ypdata[i]->server_type = malloc(
277
                strlen(source->format->format_description) + 1);
278
        strcpy(source->ypdata[i]->server_type, 
279
                source->format->format_description);
280
    }
281
    stats_event(source->mount, "type", source->format->format_description);
Jack Moffitt's avatar
Jack Moffitt committed
282

283
    for (i=0;i<source->num_yp_directories;i++) {
284
        int listen_url_size;
285 286 287 288 289
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
290
            strlen(hostname) + 
291
            strlen(":") + 6 + strlen(source->mount) + 1;
292 293
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
294
                hostname, port, source->mount);
295
    }
296

297
    if(!suppress_yp) {
298 299
        yp_add(source, YP_ADD_ALL);

300
        current_time = time(NULL);
301

302 303 304
        _add_yp_info(source, "last_touch", (void *)current_time, 
            YP_LAST_TOUCH);

305
        for (i=0;i<source->num_yp_directories;i++) {
306 307 308 309
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
            source->ypdata[i]->yp_last_touch = current_time - 
                source->ypdata[i]->yp_touch_interval + 5;
310
            /* Don't permit touch intervals of less than 30 seconds */
311 312 313 314
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
        }
315 316
    }

317 318
    DEBUG0("Source creation complete");

319 320
    while (global.running == ICE_RUNNING && source->running) {
        if(!suppress_yp) {
321
            current_time = time(NULL);
322 323
            for (i=0;i<source->num_yp_directories;i++) {
                if (current_time > (source->ypdata[i]->yp_last_touch + 
324 325
                            source->ypdata[i]->yp_touch_interval)) {
                    current_song[0] = 0;
326 327
                    if (stats_get_value(source->mount, "artist")) {
                        strncat(current_song, 
328 329
                                stats_get_value(source->mount, "artist"), 
                                sizeof(current_song) - 1);
330 331 332 333 334 335
                        if (strlen(current_song) + 4 < sizeof(current_song)) {
                            strncat(current_song, " - ", 3);
                        }
                    }
                    if (stats_get_value(source->mount, "title")) {
                        if (strlen(current_song) + 
336 337 338
                                strlen(stats_get_value(source->mount, "title"))
                                < sizeof(current_song) -1) 
                        {
339
                            strncat(current_song, 
340 341 342
                                    stats_get_value(source->mount, "title"), 
                                    sizeof(current_song) - 1 - 
                                    strlen(current_song));
343 344
                        }
                    }
345
                    
346 347 348 349 350 351
                    if (source->ypdata[i]->current_song) {
                        free(source->ypdata[i]->current_song);
                        source->ypdata[i]->current_song = NULL;
                    }
    
                    source->ypdata[i]->current_song = 
352
                        malloc(strlen(current_song) + 1);
353 354 355
                    strcpy(source->ypdata[i]->current_song, current_song);
    
                    thread_create("YP Touch Thread", yp_touch_thread, 
356
                            (void *)source, THREAD_DETACHED); 
357 358 359
                }
            }
        }
360

361
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
362 363 364 365
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
366
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
367 368 369
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
370 371
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

372
                if (ret <= 0) { /* timeout expired */
373 374
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
375 376 377
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
378

379 380
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
                if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) {
381 382
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
383
                    break;
384
                }
385 386
            }
            if (bytes <= 0) break;
387
            source->client->con->sent_bytes += bytes;
388
            ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf);
Michael Smith's avatar
Michael Smith committed
389 390 391 392
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
411

Michael Smith's avatar
Michael Smith committed
412 413 414 415 416 417 418 419 420 421 422 423
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

424 425
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
426

427 428 429 430
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
431

432 433 434
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
435

436 437 438 439 440 441 442
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
443

444
                sbytes = source->format->write_buf_to_client(source->format,
445
                        client, &abuf->data[client->pos], bytes);
446
                if (sbytes >= 0) {
447
                    if(sbytes != bytes) {
448 449 450
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
451 452 453 454 455 456
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
457 458 459
                else {
                    DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
                    client->con->error = 1;
460
                    data_done = 1;
461
                    refbuf_release(abuf);
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
478
                        client, refbuf->data, refbuf->len);
479
                if (sbytes >= 0) {
480 481
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
482
                        client->pos = sbytes;
483
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
484
                        refbuf_queue_insert(&client->queue, refbuf);
485 486
                    }
                }
487 488 489
                else {
                    DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
                    client->con->error = 1;
490 491 492 493 494 495 496 497
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
498
                DEBUG0("Client has fallen too far behind, removing");
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
                listeners--;
                stats_event_args(source->mount, "listeners", "%d", listeners);
                source->listeners = listeners;
526
                DEBUG0("Client removed");
527 528 529 530 531 532 533 534 535 536 537 538 539
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
            avl_insert(source->client_tree, client_node->key);
            listeners++;
540
            DEBUG0("Client added");
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
            stats_event_args(source->mount, "listeners", "%d", listeners);
            source->listeners = listeners;

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
            avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, source_remove_client);
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
568

Michael Smith's avatar
Michael Smith committed
569 570
done:

571
    DEBUG0("Source exiting");
572 573 574
    if(!suppress_yp) {
        yp_remove(source);
    }
Jack Moffitt's avatar
Jack Moffitt committed
575

Michael Smith's avatar
Michael Smith committed
576 577 578 579
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

580 581 582 583
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
584 585 586
    avl_tree_wlock(global.source_tree);
    avl_delete(global.source_tree, source, source_remove_source);
    avl_tree_unlock(global.source_tree);
587

588 589 590
    /* we need to empty the client and pending trees */
    avl_tree_wlock(source->pending_tree);
    while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
591 592 593
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
594
            avl_delete(source->pending_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
595

596
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
597 598 599 600 601
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
602
            avl_delete(source->pending_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
603
        }
604 605
    }
    avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
606

607 608
    avl_tree_wlock(source->client_tree);
    while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
609 610 611
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
612
            avl_delete(source->client_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
613

614
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
615 616 617 618 619
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
620
            avl_delete(source->client_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
621
        }
622 623
    }
    avl_tree_unlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
624

625 626 627
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
628

629 630 631
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
632

Michael Smith's avatar
Michael Smith committed
633 634 635
    if(source->dumpfile)
        fclose(source->dumpfile);

636 637
    source_free_source(source);

638 639
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
640

641
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
642
      
643
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
644 645 646 647
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
648
    connection_t *cona = (connection_t *)a;
649
    connection_t *conb = (connection_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
650

651 652
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
653

654
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
655 656
}

657
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
658
{
659
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
660 661 662 663
}

static int _free_client(void *key)
{
664 665 666 667 668 669 670 671 672 673
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
674
}
675

676 677
static int _parse_audio_info(source_t *source, char *s)
{
678 679 680 681
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
682 683 684 685 686

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
687
            strncpy(variable, token, pvar-token);    
688 689
            pvar++;
            if (strlen(pvar)) {
690
                value = util_url_unescape(pvar);
691 692
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
693 694 695
                if (value) {
                    free(value);
                }
696 697
            }
            if (variable) {
698
                free(variable);
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
            }
        }
        s = NULL;
    }
    return 1;
}

static void _add_yp_info(source_t *source, char *stat_name, 
            void *info, int type)
{
    int i;
    if (!info) {
        return;
    }
    for (i=0;i<source->num_yp_directories;i++) {
        switch (type) {
        case YP_SERVER_NAME:
        if (source->ypdata[i]->server_name) {
                free(source->ypdata[i]->server_name);
                }
                source->ypdata[i]->server_name = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_name, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_DESC:
                if (source->ypdata[i]->server_desc) {
                    free(source->ypdata[i]->server_desc);
                }
                source->ypdata[i]->server_desc = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_desc, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_GENRE:
                if (source->ypdata[i]->server_genre) {
                    free(source->ypdata[i]->server_genre);
                }
                source->ypdata[i]->server_genre = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_genre, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_SERVER_URL:
                if (source->ypdata[i]->server_url) {
                    free(source->ypdata[i]->server_url);
                }
                source->ypdata[i]->server_url = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_url, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_BITRATE:
                if (source->ypdata[i]->bitrate) {
                    free(source->ypdata[i]->bitrate);
                }
                source->ypdata[i]->bitrate = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->bitrate, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_AUDIO_INFO:
                if (source->ypdata[i]->audio_info) {
                    free(source->ypdata[i]->audio_info);
                }
                source->ypdata[i]->audio_info = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->audio_info, (char *)info);
                break;
        case YP_SERVER_TYPE:
                if (source->ypdata[i]->server_type) {
                    free(source->ypdata[i]->server_type);
                }
                source->ypdata[i]->server_type = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->server_type, (char *)info);
                break;
        case YP_CURRENT_SONG:
                if (source->ypdata[i]->current_song) {
                    free(source->ypdata[i]->current_song);
                }
                source->ypdata[i]->current_song = 
                    malloc(strlen((char *)info) +1);
                strcpy(source->ypdata[i]->current_song, (char *)info);
                stats_event(source->mount, stat_name, (char *)info);
                break;
        case YP_URL_TIMEOUT:
                source->ypdata[i]->yp_url_timeout = (int)info;
                break;
        case YP_LAST_TOUCH:
                source->ypdata[i]->yp_last_touch = (int)info;
                break;
        case YP_TOUCH_INTERVAL:
                source->ypdata[i]->yp_touch_interval = (int)info;
                break;
        }
    }
}