source.c 26.2 KB
Newer Older
1
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
2 3 4 5
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
6 7 8 9
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
10
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
11
#include <errno.h>
12 13

#ifndef _WIN32
14
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
15 16
#include <sys/time.h>
#include <sys/socket.h>
17
#else
18 19
#include <winsock2.h>
#include <windows.h>
20
#endif
Jack Moffitt's avatar
Jack Moffitt committed
21

Karl Heyes's avatar
Karl Heyes committed
22 23 24 25
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
26 27 28 29 30 31

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
32
#include "logging.h"
33
#include "cfgfile.h"
34
#include "util.h"
brendan's avatar
brendan committed
35
#ifdef USE_YP
36
#include "geturl.h"
37
#endif
Jack Moffitt's avatar
Jack Moffitt committed
38
#include "source.h"
Michael Smith's avatar
Michael Smith committed
39
#include "format.h"
Michael Smith's avatar
Michael Smith committed
40
#include "auth.h"
Jack Moffitt's avatar
Jack Moffitt committed
41

42 43 44
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
45 46
#define MAX_FALLBACK_DEPTH 10

Jack Moffitt's avatar
Jack Moffitt committed
47 48 49
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
50
static int _parse_audio_info(source_t *source, char *s);
Jack Moffitt's avatar
Jack Moffitt committed
51

52
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
53 54
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
55
{
56
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
57

58
    src = (source_t *)malloc(sizeof(source_t));
59
    src->client = client;
60
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
61
    src->fallback_mount = NULL;
62 63 64 65 66
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
67
    src->running = 1;
68 69
    src->num_yp_directories = 0;
    src->listeners = 0;
70
    src->max_listeners = -1;
71
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
72 73
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
74
    src->audio_info = util_dict_new();
75
    src->yp_public = 0;
Michael Smith's avatar
Michael Smith committed
76 77
    src->fallback_override = 0;
    src->no_mount = 0;
Jack Moffitt's avatar
Jack Moffitt committed
78

Michael Smith's avatar
Michael Smith committed
79
    if(mountinfo != NULL) {
80 81
        if (mountinfo->fallback_mount != NULL)
            src->fallback_mount = strdup (mountinfo->fallback_mount);
Michael Smith's avatar
Michael Smith committed
82
        src->max_listeners = mountinfo->max_listeners;
83 84
        if (mountinfo->dumpfile != NULL)
            src->dumpfilename = strdup (mountinfo->dumpfile);
Michael Smith's avatar
Michael Smith committed
85 86 87 88 89
        if(mountinfo->auth_type != NULL)
            src->authenticator = auth_get_authenticator(
                    mountinfo->auth_type, mountinfo->auth_options);
        src->fallback_override = mountinfo->fallback_override;
        src->no_mount = mountinfo->no_mount;
Michael Smith's avatar
Michael Smith committed
90 91 92 93 94 95 96 97 98 99
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

100
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
101 102
}

103 104 105 106 107
static int source_remove_source(void *key)
{
    return 1;
}

Michael Smith's avatar
Michael Smith committed
108 109 110 111
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
112
{
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
136 137
}

Michael Smith's avatar
Michael Smith committed
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
static source_t *source_find_mount_recursive(const char *mount, int depth)
{
    source_t *source = source_find_mount_raw(mount);
    mount_proxy *mountinfo;
    ice_config_t *config;
    char *fallback_mount;
    
    if(source == NULL) {
        if(depth > MAX_FALLBACK_DEPTH)
            return NULL;

        /* Look for defined mounts to find a fallback source */

        config = config_get_config();
        mountinfo = config->mounts;
        thread_mutex_lock(&(config_locks()->mounts_lock));
        config_release_config();

        while(mountinfo) {
            if(!strcmp(mountinfo->mountname, mount))
                break;
            mountinfo = mountinfo->next;
        }
        
        if(mountinfo)
            fallback_mount = mountinfo->fallback_mount;
        else
            fallback_mount = NULL;

        thread_mutex_unlock(&(config_locks()->mounts_lock));

        if(fallback_mount != NULL) {
            return source_find_mount_recursive(mount, depth+1);
        }
    }

    return source;
}

/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
    if (!mount)
        return NULL;

    return source_find_mount_recursive(mount, 0);
}

Jack Moffitt's avatar
Jack Moffitt committed
188 189
int source_compare_sources(void *arg, void *a, void *b)
{
190 191
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
192

193
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
194 195 196 197
}

int source_free_source(void *key)
{
198
    source_t *source = key;
brendan's avatar
brendan committed
199 200 201
#ifdef USE_YP
    int i;
#endif
Jack Moffitt's avatar
Jack Moffitt committed
202

203
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
204
    free(source->fallback_mount);
205
    free(source->dumpfilename);
206
    client_destroy(source->client);
207 208 209
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
brendan's avatar
brendan committed
210
#ifdef USE_YP
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
211 212 213
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
214
#endif
215
    util_dict_free(source->audio_info);
216
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
217

218
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
219
}
220 221 222 223

client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
224
    void *result;
225 226 227 228 229 230
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
231
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
232 233 234 235 236 237 238 239
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
240
    
Jack Moffitt's avatar
Jack Moffitt committed
241 242 243

void *source_main(void *arg)
{
244
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
245
    source_t *fallback_source;
246 247 248 249 250 251 252 253
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
254

brendan's avatar
brendan committed
255 256 257 258
#ifdef USE_YP
    char *s;
    long current_time;
    int    i;
259
    char *ai;
260
    int listen_url_size;
brendan's avatar
brendan committed
261
#endif
Jack Moffitt's avatar
Jack Moffitt committed
262

Michael Smith's avatar
Michael Smith committed
263 264 265 266 267 268 269 270
    long queue_limit;
    ice_config_t *config;
    char *hostname;
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
271
    timeout = config->source_timeout;
Michael Smith's avatar
Michael Smith committed
272 273 274
    hostname = config->hostname;
    port = config->port;

brendan's avatar
brendan committed
275
#ifdef USE_YP
276 277 278 279
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
Michael Smith's avatar
Michael Smith committed
280
                config->yp_url[i];
281
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
282
                config->yp_url_timeout[i];
283 284 285 286
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
287 288
#endif
    
Michael Smith's avatar
Michael Smith committed
289
    config_release_config();
290

291 292
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
293

294 295 296 297
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
Michael Smith's avatar
Michael Smith committed
298
    if (source_find_mount_raw(source->mount) != NULL) {
299 300 301 302
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
303 304 305 306
        global_lock();
        global.sources--;
        global_unlock();
        thread_rwlock_unlock(source->shutdown_rwlock);
307 308 309
        thread_exit(0);
        return NULL;
    }
310 311 312 313
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
314

315 316 317 318 319 320 321 322 323
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
324

325 326
    /* start off the statistics */
    source->listeners = 0;
327 328
    stats_event(source->mount, "listeners", "0");
    stats_event(source->mount, "type", source->format->format_description);
brendan's avatar
brendan committed
329
#ifdef USE_YP
330 331 332 333
    /* ice-* is icecast, icy-* is shoutcast */
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
334
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
335 336 337 338
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
    }
    if ((s = httpp_getvar(source->parser, "icy-name"))) {
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
339 340
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
341 342 343 344
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
    if ((s = httpp_getvar(source->parser, "icy-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
345 346
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
347 348 349 350
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
    }
    if ((s = httpp_getvar(source->parser, "icy-genre"))) {
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
351 352
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
353 354 355 356
        add_yp_info(source, "bitrate", s, YP_BITRATE);
    }
    if ((s = httpp_getvar(source->parser, "icy-br"))) {
        add_yp_info(source, "bitrate", s, YP_BITRATE);
357 358
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
359
        add_yp_info(source, "server_description", s, YP_SERVER_DESC);
360
    }
361
    if ((s = httpp_getvar(source->parser, "ice-public"))) {
362
        stats_event(source->mount, "public", s);
363 364 365 366 367
        source->yp_public = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "icy-pub"))) {
        stats_event(source->mount, "public", s);
        source->yp_public = atoi(s);
368 369
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
370
        stats_event(source->mount, "audio_info", s);
371 372
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
373
            add_yp_info(source, "audio_info", 
374 375
                    ai,
                    YP_AUDIO_INFO);
376 377 378
            if (ai) {
                free(ai);
            }
379
        }
380 381
    }
    for (i=0;i<source->num_yp_directories;i++) {
382
        add_yp_info(source, "server_type", 
383 384
                     source->format->format_description,
                     YP_SERVER_TYPE);
385 386 387 388 389
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
390
            strlen(hostname) + 
391
            strlen(":") + 6 + strlen(source->mount) + 1;
392 393
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
394
                hostname, port, source->mount);
395
    }
396

397
    if(source->yp_public) {
398

399
        current_time = time(NULL);
400

401
        for (i=0;i<source->num_yp_directories;i++) {
402 403
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
404
            /* Don't permit touch intervals of less than 30 seconds */
405 406 407
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
408
            source->ypdata[i]->yp_last_touch = 0;
409
        }
410
    }
411
#endif
412

413 414
    DEBUG0("Source creation complete");

Michael Smith's avatar
Michael Smith committed
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
    /*
    ** Now, if we have a fallback source and override is on, we want
    ** to steal it's clients, because it means we've come back online
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

    if(source->fallback_override && source->fallback_mount) {
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);
        avl_tree_unlock(global.source_tree);

        if(fallback_source) {
            /* we need to move the client and pending trees */
            avl_tree_wlock(fallback_source->pending_tree);
            while (avl_get_first(fallback_source->pending_tree)) {
                client_t *client = (client_t *)avl_get_first(
                        fallback_source->pending_tree)->key;
                avl_delete(fallback_source->pending_tree, client, 
                        source_remove_client);

                /* TODO: reset client local format data?  */
                avl_tree_wlock(source->pending_tree);
                avl_insert(source->pending_tree, (void *)client);
                avl_tree_unlock(source->pending_tree);
            }
            avl_tree_unlock(fallback_source->pending_tree);

            avl_tree_wlock(fallback_source->client_tree);
            while (avl_get_first(fallback_source->client_tree)) {
                client_t *client = (client_t *)avl_get_first(
                        fallback_source->client_tree)->key;

                avl_delete(fallback_source->client_tree, client, 
                        source_remove_client);

                /* TODO: reset client local format data?  */
                avl_tree_wlock(source->pending_tree);
                avl_insert(source->pending_tree, (void *)client);
                avl_tree_unlock(source->pending_tree);
            }
            avl_tree_unlock(fallback_source->client_tree);
        }
    }

460 461
    while (global.running == ICE_RUNNING && source->running) {
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
462 463 464 465
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
466
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
467 468 469
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
470 471
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

472 473
                if (ret < 0 && sock_recoverable (sock_error()))
                   continue;
474
                if (ret <= 0) { /* timeout expired */
475 476
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
477 478 479
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
480

481
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
Michael Smith's avatar
Michael Smith committed
482 483 484
                if (bytes == 0 || 
                        (bytes < 0 && !sock_recoverable(sock_error()))) 
                {
485 486
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
487
                    break;
488
                }
489 490
            }
            if (bytes <= 0) break;
491
            source->client->con->sent_bytes += bytes;
Michael Smith's avatar
Michael Smith committed
492 493
            ret = source->format->get_buffer(source->format, buffer, bytes, 
                    &refbuf);
Michael Smith's avatar
Michael Smith committed
494 495 496 497
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
516

Michael Smith's avatar
Michael Smith committed
517 518 519 520 521 522 523 524 525 526 527 528
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

529 530
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
531

532 533 534 535
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
536

537 538 539
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
540

541 542 543 544 545 546 547
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
548

549
                sbytes = source->format->write_buf_to_client(source->format,
550
                        client, &abuf->data[client->pos], bytes);
551
                if (sbytes >= 0) {
552
                    if(sbytes != bytes) {
553 554 555
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
556 557 558 559 560 561
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
562
                else {
Michael Smith's avatar
Michael Smith committed
563 564
                    DEBUG0("Client has unrecoverable error catching up. "
                            "Client has probably disconnected");
565
                    client->con->error = 1;
566
                    data_done = 1;
567
                    refbuf_release(abuf);
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
584
                        client, refbuf->data, refbuf->len);
585
                if (sbytes >= 0) {
586 587
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
588
                        client->pos = sbytes;
589
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
590
                        refbuf_queue_insert(&client->queue, refbuf);
591 592
                    }
                }
593
                else {
Michael Smith's avatar
Michael Smith committed
594 595
                    DEBUG0("Client had unrecoverable error with new data, "
                            "probably due to client disconnection");
596
                    client->con->error = 1;
597 598 599 600 601 602 603 604
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
605
                DEBUG0("Client has fallen too far behind, removing");
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
Michael Smith's avatar
Michael Smith committed
630 631 632
                source->listeners--;
                stats_event_args(source->mount, "listeners", "%d", 
                        source->listeners);
633
                DEBUG0("Client removed");
634 635 636 637 638 639 640 641 642 643 644
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
Michael Smith's avatar
Michael Smith committed
645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662
            if(source->max_listeners != -1 && 
                    source->listeners >= source->max_listeners) 
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

                INFO0("Client deleted, exceeding maximum listeners for this "
                        "mountpoint.");
                continue;
            }
            
            /* Otherwise, the client is accepted, add it */
663
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
664 665

            source->listeners++;
666
            DEBUG0("Client added");
667 668
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
Michael Smith's avatar
Michael Smith committed
669 670
            stats_event_args(source->mount, "listeners", "%d", 
                    source->listeners);
671 672 673 674 675 676 677 678 679 680 681 682 683 684

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
685 686 687
            avl_delete(source->pending_tree, 
                    avl_get_first(source->pending_tree)->key, 
                    source_remove_client);
688 689 690 691 692 693 694 695
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
696

Michael Smith's avatar
Michael Smith committed
697 698
done:

699
    INFO1("Source \"%s\" exiting", source->mount);
700

brendan's avatar
brendan committed
701
#ifdef USE_YP
702
    if(source->yp_public) {
703 704
        yp_remove(source);
    }
705 706
#endif
    
Michael Smith's avatar
Michael Smith committed
707 708 709 710
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

711 712 713 714
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
715 716 717
    avl_tree_wlock(global.source_tree);
    avl_delete(global.source_tree, source, source_remove_source);
    avl_tree_unlock(global.source_tree);
718

719 720 721
    /* we need to empty the client and pending trees */
    avl_tree_wlock(source->pending_tree);
    while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
722 723 724
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
725
            avl_delete(source->pending_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
726

727
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
728 729 730 731 732
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
733
            avl_delete(source->pending_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
734
        }
735 736
    }
    avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
737

738 739
    avl_tree_wlock(source->client_tree);
    while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
740 741 742
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
743
            avl_delete(source->client_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
744

745
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
746 747 748 749 750
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
751
            avl_delete(source->client_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
752
        }
753 754
    }
    avl_tree_unlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
755

756 757 758
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
759

760 761 762
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
763

Michael Smith's avatar
Michael Smith committed
764 765 766
    if(source->dumpfile)
        fclose(source->dumpfile);

767 768
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
769

770 771
    source_free_source(source);

772
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
773
      
774
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
775 776 777 778
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
779 780 781 782 783
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
784

785 786
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
787

788
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
789 790
}

791
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
792
{
793
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
794 795 796 797
}

static int _free_client(void *key)
{
798 799 800 801 802 803 804 805 806 807
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
808
}
809

810 811
static int _parse_audio_info(source_t *source, char *s)
{
812 813 814 815
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
816 817 818 819 820

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
821
            strncpy(variable, token, pvar-token);    
822
            variable[pvar-token] = 0;
823 824
            pvar++;
            if (strlen(pvar)) {
825
                value = util_url_unescape(pvar);
826 827
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
828 829 830
                if (value) {
                    free(value);
                }
831 832
            }
            if (variable) {
833
                free(variable);
834 835 836 837 838 839
            }
        }
        s = NULL;
    }
    return 1;
}