source.c 27.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
18 19 20 21
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
22
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
23
#include <errno.h>
24 25

#ifndef _WIN32
26
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
27 28
#include <sys/time.h>
#include <sys/socket.h>
29
#else
30 31
#include <winsock2.h>
#include <windows.h>
32
#endif
Jack Moffitt's avatar
Jack Moffitt committed
33

Karl Heyes's avatar
Karl Heyes committed
34 35 36 37
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
38 39 40 41 42 43

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
44
#include "logging.h"
45
#include "cfgfile.h"
46
#include "util.h"
brendan's avatar
brendan committed
47
#ifdef USE_YP
48
#include "geturl.h"
49
#endif
Jack Moffitt's avatar
Jack Moffitt committed
50
#include "source.h"
Michael Smith's avatar
Michael Smith committed
51
#include "format.h"
Michael Smith's avatar
Michael Smith committed
52
#include "auth.h"
Jack Moffitt's avatar
Jack Moffitt committed
53

54 55 56
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
57 58
#define MAX_FALLBACK_DEPTH 10

Jack Moffitt's avatar
Jack Moffitt committed
59 60 61
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
62
static int _parse_audio_info(source_t *source, char *s);
Jack Moffitt's avatar
Jack Moffitt committed
63

64
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
65 66
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
67
{
68
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
69

70
    src = (source_t *)malloc(sizeof(source_t));
71
    src->client = client;
72
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
73
    src->fallback_mount = NULL;
74 75 76 77 78
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
79
    src->running = 1;
80 81
    src->num_yp_directories = 0;
    src->listeners = 0;
82
    src->max_listeners = -1;
83
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
84 85
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
86
    src->audio_info = util_dict_new();
87
    src->yp_public = 0;
Michael Smith's avatar
Michael Smith committed
88 89
    src->fallback_override = 0;
    src->no_mount = 0;
90
    src->authenticator = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
91

Michael Smith's avatar
Michael Smith committed
92
    if(mountinfo != NULL) {
93 94
        if (mountinfo->fallback_mount != NULL)
            src->fallback_mount = strdup (mountinfo->fallback_mount);
Michael Smith's avatar
Michael Smith committed
95
        src->max_listeners = mountinfo->max_listeners;
96 97
        if (mountinfo->dumpfile != NULL)
            src->dumpfilename = strdup (mountinfo->dumpfile);
Michael Smith's avatar
Michael Smith committed
98 99 100 101 102
        if(mountinfo->auth_type != NULL)
            src->authenticator = auth_get_authenticator(
                    mountinfo->auth_type, mountinfo->auth_options);
        src->fallback_override = mountinfo->fallback_override;
        src->no_mount = mountinfo->no_mount;
Michael Smith's avatar
Michael Smith committed
103 104 105 106 107 108 109 110 111 112
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

113
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
114 115
}

116 117 118 119 120
static int source_remove_source(void *key)
{
    return 1;
}

Michael Smith's avatar
Michael Smith committed
121 122 123 124
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
125
{
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
149 150
}

Michael Smith's avatar
Michael Smith committed
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
static source_t *source_find_mount_recursive(const char *mount, int depth)
{
    source_t *source = source_find_mount_raw(mount);
    mount_proxy *mountinfo;
    ice_config_t *config;
    char *fallback_mount;
    
    if(source == NULL) {
        if(depth > MAX_FALLBACK_DEPTH)
            return NULL;

        /* Look for defined mounts to find a fallback source */

        config = config_get_config();
        mountinfo = config->mounts;
        thread_mutex_lock(&(config_locks()->mounts_lock));
        config_release_config();

        while(mountinfo) {
            if(!strcmp(mountinfo->mountname, mount))
                break;
            mountinfo = mountinfo->next;
        }
        
        if(mountinfo)
            fallback_mount = mountinfo->fallback_mount;
        else
            fallback_mount = NULL;

        thread_mutex_unlock(&(config_locks()->mounts_lock));

        if(fallback_mount != NULL) {
183
            return source_find_mount_recursive(fallback_mount, depth+1);
Michael Smith's avatar
Michael Smith committed
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
        }
    }

    return source;
}

/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
    if (!mount)
        return NULL;

    return source_find_mount_recursive(mount, 0);
}

Jack Moffitt's avatar
Jack Moffitt committed
201 202
int source_compare_sources(void *arg, void *a, void *b)
{
203 204
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
205

206
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
207 208 209 210
}

int source_free_source(void *key)
{
211
    source_t *source = key;
brendan's avatar
brendan committed
212 213 214
#ifdef USE_YP
    int i;
#endif
Jack Moffitt's avatar
Jack Moffitt committed
215

216
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
217
    free(source->fallback_mount);
218
    free(source->dumpfilename);
219
    client_destroy(source->client);
220 221 222
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
brendan's avatar
brendan committed
223
#ifdef USE_YP
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
224 225 226
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
227
#endif
228
    util_dict_free(source->audio_info);
229
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
230

231
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
232
}
233 234 235 236

client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
237
    void *result;
238 239 240 241 242 243
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
244
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
245 246 247 248 249 250 251 252
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
253
    
Jack Moffitt's avatar
Jack Moffitt committed
254 255 256

void *source_main(void *arg)
{
257
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
258
    source_t *fallback_source;
259 260 261 262 263 264 265 266
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
267

brendan's avatar
brendan committed
268 269 270 271
#ifdef USE_YP
    char *s;
    long current_time;
    int    i;
272
    char *ai;
brendan's avatar
brendan committed
273
#endif
Jack Moffitt's avatar
Jack Moffitt committed
274

Michael Smith's avatar
Michael Smith committed
275 276 277
    long queue_limit;
    ice_config_t *config;
    char *hostname;
278 279
    char *listenurl;
    int listen_url_size;
Michael Smith's avatar
Michael Smith committed
280 281 282 283 284
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
285
    timeout = config->source_timeout;
286
    hostname = strdup(config->hostname);
Michael Smith's avatar
Michael Smith committed
287 288
    port = config->port;

brendan's avatar
brendan committed
289
#ifdef USE_YP
290 291 292 293
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
Michael Smith's avatar
Michael Smith committed
294
                config->yp_url[i];
295
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
296
                config->yp_url_timeout[i];
297 298 299 300
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
301 302
#endif
    
Michael Smith's avatar
Michael Smith committed
303
    config_release_config();
304

305 306
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
307

308 309 310 311
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
Michael Smith's avatar
Michael Smith committed
312
    if (source_find_mount_raw(source->mount) != NULL) {
313 314 315 316
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
317 318 319 320
        global_lock();
        global.sources--;
        global_unlock();
        thread_rwlock_unlock(source->shutdown_rwlock);
321 322 323
        thread_exit(0);
        return NULL;
    }
324 325 326 327
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
328

329 330 331 332 333 334 335 336 337
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
338

339 340
    /* start off the statistics */
    source->listeners = 0;
341 342
    stats_event(source->mount, "listeners", "0");
    stats_event(source->mount, "type", source->format->format_description);
brendan's avatar
brendan committed
343
#ifdef USE_YP
344 345 346 347
    /* ice-* is icecast, icy-* is shoutcast */
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
348
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
349 350 351 352
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
    }
    if ((s = httpp_getvar(source->parser, "icy-name"))) {
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
353 354
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
355 356 357 358
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
    if ((s = httpp_getvar(source->parser, "icy-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
359 360
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
361 362 363 364
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
    }
    if ((s = httpp_getvar(source->parser, "icy-genre"))) {
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
365 366
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
367 368 369 370
        add_yp_info(source, "bitrate", s, YP_BITRATE);
    }
    if ((s = httpp_getvar(source->parser, "icy-br"))) {
        add_yp_info(source, "bitrate", s, YP_BITRATE);
371 372
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
373
        add_yp_info(source, "server_description", s, YP_SERVER_DESC);
374
    }
375
    if ((s = httpp_getvar(source->parser, "ice-public"))) {
376
        stats_event(source->mount, "public", s);
377 378 379 380 381
        source->yp_public = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "icy-pub"))) {
        stats_event(source->mount, "public", s);
        source->yp_public = atoi(s);
382 383
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
384
        stats_event(source->mount, "audio_info", s);
385 386
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
387
            add_yp_info(source, "audio_info", 
388 389
                    ai,
                    YP_AUDIO_INFO);
390 391 392
            if (ai) {
                free(ai);
            }
393
        }
394 395
    }
    for (i=0;i<source->num_yp_directories;i++) {
396
        add_yp_info(source, "server_type", 
397 398
                     source->format->format_description,
                     YP_SERVER_TYPE);
399 400 401 402 403
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
404
            strlen(hostname) + 
405
            strlen(":") + 6 + strlen(source->mount) + 1;
406 407
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
408
                hostname, port, source->mount);
409
    }
410

411
    if(source->yp_public) {
412

413
        current_time = time(NULL);
414

415
        for (i=0;i<source->num_yp_directories;i++) {
416 417
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
418
            /* Don't permit touch intervals of less than 30 seconds */
419 420 421
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
422
            source->ypdata[i]->yp_last_touch = 0;
423
        }
424
    }
425
#endif
426 427 428 429 430 431 432 433 434 435 436 437 438 439
    /* 6 for max size of port */
    listen_url_size = strlen("http://") + 
    strlen(hostname) + strlen(":") + 6 + strlen(source->mount) + 1;
    
    listenurl = malloc(listen_url_size);
    memset(listenurl, '\000', listen_url_size);
    sprintf(listenurl, "http://%s:%d%s", hostname, port, source->mount);
    stats_event(source->mount, "listenurl", listenurl);
    if (hostname) {
        free(hostname);
    }
    if (listenurl) {
        free(listenurl);
    }
440

441 442
    DEBUG0("Source creation complete");

Michael Smith's avatar
Michael Smith committed
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
    /*
    ** Now, if we have a fallback source and override is on, we want
    ** to steal it's clients, because it means we've come back online
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

    if(source->fallback_override && source->fallback_mount) {
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);
        avl_tree_unlock(global.source_tree);

        if(fallback_source) {
            /* we need to move the client and pending trees */
            avl_tree_wlock(fallback_source->pending_tree);
            while (avl_get_first(fallback_source->pending_tree)) {
                client_t *client = (client_t *)avl_get_first(
                        fallback_source->pending_tree)->key;
                avl_delete(fallback_source->pending_tree, client, 
                        source_remove_client);

                /* TODO: reset client local format data?  */
                avl_tree_wlock(source->pending_tree);
                avl_insert(source->pending_tree, (void *)client);
                avl_tree_unlock(source->pending_tree);
            }
            avl_tree_unlock(fallback_source->pending_tree);

            avl_tree_wlock(fallback_source->client_tree);
            while (avl_get_first(fallback_source->client_tree)) {
                client_t *client = (client_t *)avl_get_first(
                        fallback_source->client_tree)->key;

                avl_delete(fallback_source->client_tree, client, 
                        source_remove_client);

                /* TODO: reset client local format data?  */
                avl_tree_wlock(source->pending_tree);
                avl_insert(source->pending_tree, (void *)client);
                avl_tree_unlock(source->pending_tree);
            }
            avl_tree_unlock(fallback_source->client_tree);
        }
    }

488 489
    while (global.running == ICE_RUNNING && source->running) {
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
490 491 492 493
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
494
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
495 496 497
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
498 499
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

500 501
                if (ret < 0 && sock_recoverable (sock_error()))
                   continue;
502
                if (ret <= 0) { /* timeout expired */
503 504
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
505 506 507
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
508

509
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
Michael Smith's avatar
Michael Smith committed
510 511 512
                if (bytes == 0 || 
                        (bytes < 0 && !sock_recoverable(sock_error()))) 
                {
513 514
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
515
                    break;
516
                }
517 518
            }
            if (bytes <= 0) break;
519
            source->client->con->sent_bytes += bytes;
Michael Smith's avatar
Michael Smith committed
520 521
            ret = source->format->get_buffer(source->format, buffer, bytes, 
                    &refbuf);
Michael Smith's avatar
Michael Smith committed
522 523 524 525
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
544

Michael Smith's avatar
Michael Smith committed
545 546 547 548 549 550 551 552 553 554 555 556
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

557 558
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
559

560 561 562 563
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
564

565 566 567
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
568

569 570 571 572 573 574 575
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
576

577
                sbytes = source->format->write_buf_to_client(source->format,
578
                        client, &abuf->data[client->pos], bytes);
579
                if (sbytes >= 0) {
580
                    if(sbytes != bytes) {
581 582 583
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
584 585 586 587 588 589
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
590
                else {
Michael Smith's avatar
Michael Smith committed
591 592
                    DEBUG0("Client has unrecoverable error catching up. "
                            "Client has probably disconnected");
593
                    client->con->error = 1;
594
                    data_done = 1;
595
                    refbuf_release(abuf);
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
612
                        client, refbuf->data, refbuf->len);
613
                if (sbytes >= 0) {
614 615
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
616
                        client->pos = sbytes;
617
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
618
                        refbuf_queue_insert(&client->queue, refbuf);
619 620
                    }
                }
621
                else {
Michael Smith's avatar
Michael Smith committed
622 623
                    DEBUG0("Client had unrecoverable error with new data, "
                            "probably due to client disconnection");
624
                    client->con->error = 1;
625 626 627 628 629 630 631 632
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
633
                DEBUG0("Client has fallen too far behind, removing");
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
Michael Smith's avatar
Michael Smith committed
658 659 660
                source->listeners--;
                stats_event_args(source->mount, "listeners", "%d", 
                        source->listeners);
661
                DEBUG0("Client removed");
662 663 664 665 666 667 668 669 670 671 672
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
Michael Smith's avatar
Michael Smith committed
673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
            if(source->max_listeners != -1 && 
                    source->listeners >= source->max_listeners) 
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

                INFO0("Client deleted, exceeding maximum listeners for this "
                        "mountpoint.");
                continue;
            }
            
            /* Otherwise, the client is accepted, add it */
691
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
692 693

            source->listeners++;
694
            DEBUG0("Client added");
695 696
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
Michael Smith's avatar
Michael Smith committed
697 698
            stats_event_args(source->mount, "listeners", "%d", 
                    source->listeners);
699 700 701 702 703 704 705 706 707 708 709 710 711 712

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
713 714 715
            avl_delete(source->pending_tree, 
                    avl_get_first(source->pending_tree)->key, 
                    source_remove_client);
716 717 718 719 720 721 722 723
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
724

Michael Smith's avatar
Michael Smith committed
725 726
done:

727
    INFO1("Source \"%s\" exiting", source->mount);
728

brendan's avatar
brendan committed
729
#ifdef USE_YP
730
    if(source->yp_public) {
731 732
        yp_remove(source);
    }
733 734
#endif
    
Michael Smith's avatar
Michael Smith committed
735 736 737 738
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

739 740 741 742
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
743 744 745
    avl_tree_wlock(global.source_tree);
    avl_delete(global.source_tree, source, source_remove_source);
    avl_tree_unlock(global.source_tree);
746

747 748 749
    /* we need to empty the client and pending trees */
    avl_tree_wlock(source->pending_tree);
    while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
750 751 752
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
753
            avl_delete(source->pending_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
754

755
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
756 757 758 759 760
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
761
            avl_delete(source->pending_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
762
        }
763 764
    }
    avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
765

766 767
    avl_tree_wlock(source->client_tree);
    while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
768 769 770
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
771
            avl_delete(source->client_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
772

773
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
774 775 776 777 778
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
779
            avl_delete(source->client_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
780
        }
781 782
    }
    avl_tree_unlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
783

784 785 786
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
787

788 789 790
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
791

Michael Smith's avatar
Michael Smith committed
792 793 794
    if(source->dumpfile)
        fclose(source->dumpfile);

795 796
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
797

798 799
    source_free_source(source);

800
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
801
      
802
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
803 804 805 806
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
807 808 809 810 811
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
812

813 814
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
815

816
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
817 818
}

819
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
820
{
821
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
822 823 824 825
}

static int _free_client(void *key)
{
826 827 828 829 830 831 832 833 834 835
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
836
}
837

838 839
static int _parse_audio_info(source_t *source, char *s)
{
840 841 842 843
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
844 845 846 847 848

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
849
            strncpy(variable, token, pvar-token);    
850
            variable[pvar-token] = 0;
851 852
            pvar++;
            if (strlen(pvar)) {
853
                value = util_url_unescape(pvar);
854 855
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
856 857 858
                if (value) {
                    free(value);
                }
859 860
            }
            if (variable) {
861
                free(variable);
862 863 864 865 866 867
            }
        }
        s = NULL;
    }
    return 1;
}