source.c 26.7 KB
Newer Older
1
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
2 3 4 5
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
6 7 8 9
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
10
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
11
#include <errno.h>
12 13

#ifndef _WIN32
14
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
15 16
#include <sys/time.h>
#include <sys/socket.h>
17
#else
18 19
#include <winsock2.h>
#include <windows.h>
20
#endif
Jack Moffitt's avatar
Jack Moffitt committed
21

Karl Heyes's avatar
Karl Heyes committed
22 23 24 25
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
26 27 28 29 30 31

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
32
#include "logging.h"
33
#include "cfgfile.h"
34
#include "util.h"
brendan's avatar
brendan committed
35
#ifdef USE_YP
36
#include "geturl.h"
37
#endif
Jack Moffitt's avatar
Jack Moffitt committed
38
#include "source.h"
Michael Smith's avatar
Michael Smith committed
39
#include "format.h"
Michael Smith's avatar
Michael Smith committed
40
#include "auth.h"
Jack Moffitt's avatar
Jack Moffitt committed
41

42 43 44
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
45 46
#define MAX_FALLBACK_DEPTH 10

Jack Moffitt's avatar
Jack Moffitt committed
47 48 49
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
50
static int _parse_audio_info(source_t *source, char *s);
Jack Moffitt's avatar
Jack Moffitt committed
51

52
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
53 54
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
55
{
56
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
57

58
    src = (source_t *)malloc(sizeof(source_t));
59
    src->client = client;
60
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
61
    src->fallback_mount = NULL;
62 63 64 65 66
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
67
    src->running = 1;
68 69
    src->num_yp_directories = 0;
    src->listeners = 0;
70
    src->max_listeners = -1;
71
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
72 73
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
74
    src->audio_info = util_dict_new();
75
    src->yp_public = 0;
Michael Smith's avatar
Michael Smith committed
76 77
    src->fallback_override = 0;
    src->no_mount = 0;
78
    src->authenticator = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
79

Michael Smith's avatar
Michael Smith committed
80
    if(mountinfo != NULL) {
81 82
        if (mountinfo->fallback_mount != NULL)
            src->fallback_mount = strdup (mountinfo->fallback_mount);
Michael Smith's avatar
Michael Smith committed
83
        src->max_listeners = mountinfo->max_listeners;
84 85
        if (mountinfo->dumpfile != NULL)
            src->dumpfilename = strdup (mountinfo->dumpfile);
Michael Smith's avatar
Michael Smith committed
86 87 88 89 90
        if(mountinfo->auth_type != NULL)
            src->authenticator = auth_get_authenticator(
                    mountinfo->auth_type, mountinfo->auth_options);
        src->fallback_override = mountinfo->fallback_override;
        src->no_mount = mountinfo->no_mount;
Michael Smith's avatar
Michael Smith committed
91 92 93 94 95 96 97 98 99 100
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

101
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
102 103
}

104 105 106 107 108
static int source_remove_source(void *key)
{
    return 1;
}

Michael Smith's avatar
Michael Smith committed
109 110 111 112
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
113
{
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
137 138
}

Michael Smith's avatar
Michael Smith committed
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
static source_t *source_find_mount_recursive(const char *mount, int depth)
{
    source_t *source = source_find_mount_raw(mount);
    mount_proxy *mountinfo;
    ice_config_t *config;
    char *fallback_mount;
    
    if(source == NULL) {
        if(depth > MAX_FALLBACK_DEPTH)
            return NULL;

        /* Look for defined mounts to find a fallback source */

        config = config_get_config();
        mountinfo = config->mounts;
        thread_mutex_lock(&(config_locks()->mounts_lock));
        config_release_config();

        while(mountinfo) {
            if(!strcmp(mountinfo->mountname, mount))
                break;
            mountinfo = mountinfo->next;
        }
        
        if(mountinfo)
            fallback_mount = mountinfo->fallback_mount;
        else
            fallback_mount = NULL;

        thread_mutex_unlock(&(config_locks()->mounts_lock));

        if(fallback_mount != NULL) {
171
            return source_find_mount_recursive(fallback_mount, depth+1);
Michael Smith's avatar
Michael Smith committed
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
        }
    }

    return source;
}

/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
    if (!mount)
        return NULL;

    return source_find_mount_recursive(mount, 0);
}

Jack Moffitt's avatar
Jack Moffitt committed
189 190
int source_compare_sources(void *arg, void *a, void *b)
{
191 192
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
193

194
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
195 196 197 198
}

int source_free_source(void *key)
{
199
    source_t *source = key;
brendan's avatar
brendan committed
200 201 202
#ifdef USE_YP
    int i;
#endif
Jack Moffitt's avatar
Jack Moffitt committed
203

204
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
205
    free(source->fallback_mount);
206
    free(source->dumpfilename);
207
    client_destroy(source->client);
208 209 210
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
brendan's avatar
brendan committed
211
#ifdef USE_YP
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
212 213 214
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
215
#endif
216
    util_dict_free(source->audio_info);
217
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
218

219
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
220
}
221 222 223 224

client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
225
    void *result;
226 227 228 229 230 231
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
232
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
233 234 235 236 237 238 239 240
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
241
    
Jack Moffitt's avatar
Jack Moffitt committed
242 243 244

void *source_main(void *arg)
{
245
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
246
    source_t *fallback_source;
247 248 249 250 251 252 253 254
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
255

brendan's avatar
brendan committed
256 257 258 259
#ifdef USE_YP
    char *s;
    long current_time;
    int    i;
260
    char *ai;
brendan's avatar
brendan committed
261
#endif
Jack Moffitt's avatar
Jack Moffitt committed
262

Michael Smith's avatar
Michael Smith committed
263 264 265
    long queue_limit;
    ice_config_t *config;
    char *hostname;
266 267
    char *listenurl;
    int listen_url_size;
Michael Smith's avatar
Michael Smith committed
268 269 270 271 272
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
273
    timeout = config->source_timeout;
274
    hostname = strdup(config->hostname);
Michael Smith's avatar
Michael Smith committed
275 276
    port = config->port;

brendan's avatar
brendan committed
277
#ifdef USE_YP
278 279 280 281
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
Michael Smith's avatar
Michael Smith committed
282
                config->yp_url[i];
283
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
284
                config->yp_url_timeout[i];
285 286 287 288
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
289 290
#endif
    
Michael Smith's avatar
Michael Smith committed
291
    config_release_config();
292

293 294
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
295

296 297 298 299
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
Michael Smith's avatar
Michael Smith committed
300
    if (source_find_mount_raw(source->mount) != NULL) {
301 302 303 304
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
305 306 307 308
        global_lock();
        global.sources--;
        global_unlock();
        thread_rwlock_unlock(source->shutdown_rwlock);
309 310 311
        thread_exit(0);
        return NULL;
    }
312 313 314 315
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
316

317 318 319 320 321 322 323 324 325
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
326

327 328
    /* start off the statistics */
    source->listeners = 0;
329 330
    stats_event(source->mount, "listeners", "0");
    stats_event(source->mount, "type", source->format->format_description);
brendan's avatar
brendan committed
331
#ifdef USE_YP
332 333 334 335
    /* ice-* is icecast, icy-* is shoutcast */
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
336
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
337 338 339 340
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
    }
    if ((s = httpp_getvar(source->parser, "icy-name"))) {
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
341 342
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
343 344 345 346
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
    if ((s = httpp_getvar(source->parser, "icy-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
347 348
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
349 350 351 352
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
    }
    if ((s = httpp_getvar(source->parser, "icy-genre"))) {
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
353 354
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
355 356 357 358
        add_yp_info(source, "bitrate", s, YP_BITRATE);
    }
    if ((s = httpp_getvar(source->parser, "icy-br"))) {
        add_yp_info(source, "bitrate", s, YP_BITRATE);
359 360
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
361
        add_yp_info(source, "server_description", s, YP_SERVER_DESC);
362
    }
363
    if ((s = httpp_getvar(source->parser, "ice-public"))) {
364
        stats_event(source->mount, "public", s);
365 366 367 368 369
        source->yp_public = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "icy-pub"))) {
        stats_event(source->mount, "public", s);
        source->yp_public = atoi(s);
370 371
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
372
        stats_event(source->mount, "audio_info", s);
373 374
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
375
            add_yp_info(source, "audio_info", 
376 377
                    ai,
                    YP_AUDIO_INFO);
378 379 380
            if (ai) {
                free(ai);
            }
381
        }
382 383
    }
    for (i=0;i<source->num_yp_directories;i++) {
384
        add_yp_info(source, "server_type", 
385 386
                     source->format->format_description,
                     YP_SERVER_TYPE);
387 388 389 390 391
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
392
            strlen(hostname) + 
393
            strlen(":") + 6 + strlen(source->mount) + 1;
394 395
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
396
                hostname, port, source->mount);
397
    }
398

399
    if(source->yp_public) {
400

401
        current_time = time(NULL);
402

403
        for (i=0;i<source->num_yp_directories;i++) {
404 405
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
406
            /* Don't permit touch intervals of less than 30 seconds */
407 408 409
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
410
            source->ypdata[i]->yp_last_touch = 0;
411
        }
412
    }
413
#endif
414 415 416 417 418 419 420 421 422 423 424 425 426 427
    /* 6 for max size of port */
    listen_url_size = strlen("http://") + 
    strlen(hostname) + strlen(":") + 6 + strlen(source->mount) + 1;
    
    listenurl = malloc(listen_url_size);
    memset(listenurl, '\000', listen_url_size);
    sprintf(listenurl, "http://%s:%d%s", hostname, port, source->mount);
    stats_event(source->mount, "listenurl", listenurl);
    if (hostname) {
        free(hostname);
    }
    if (listenurl) {
        free(listenurl);
    }
428

429 430
    DEBUG0("Source creation complete");

Michael Smith's avatar
Michael Smith committed
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
    /*
    ** Now, if we have a fallback source and override is on, we want
    ** to steal it's clients, because it means we've come back online
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

    if(source->fallback_override && source->fallback_mount) {
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);
        avl_tree_unlock(global.source_tree);

        if(fallback_source) {
            /* we need to move the client and pending trees */
            avl_tree_wlock(fallback_source->pending_tree);
            while (avl_get_first(fallback_source->pending_tree)) {
                client_t *client = (client_t *)avl_get_first(
                        fallback_source->pending_tree)->key;
                avl_delete(fallback_source->pending_tree, client, 
                        source_remove_client);

                /* TODO: reset client local format data?  */
                avl_tree_wlock(source->pending_tree);
                avl_insert(source->pending_tree, (void *)client);
                avl_tree_unlock(source->pending_tree);
            }
            avl_tree_unlock(fallback_source->pending_tree);

            avl_tree_wlock(fallback_source->client_tree);
            while (avl_get_first(fallback_source->client_tree)) {
                client_t *client = (client_t *)avl_get_first(
                        fallback_source->client_tree)->key;

                avl_delete(fallback_source->client_tree, client, 
                        source_remove_client);

                /* TODO: reset client local format data?  */
                avl_tree_wlock(source->pending_tree);
                avl_insert(source->pending_tree, (void *)client);
                avl_tree_unlock(source->pending_tree);
            }
            avl_tree_unlock(fallback_source->client_tree);
        }
    }

476 477
    while (global.running == ICE_RUNNING && source->running) {
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
478 479 480 481
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
482
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
483 484 485
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
486 487
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

488 489
                if (ret < 0 && sock_recoverable (sock_error()))
                   continue;
490
                if (ret <= 0) { /* timeout expired */
491 492
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
493 494 495
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
496

497
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
Michael Smith's avatar
Michael Smith committed
498 499 500
                if (bytes == 0 || 
                        (bytes < 0 && !sock_recoverable(sock_error()))) 
                {
501 502
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
503
                    break;
504
                }
505 506
            }
            if (bytes <= 0) break;
507
            source->client->con->sent_bytes += bytes;
Michael Smith's avatar
Michael Smith committed
508 509
            ret = source->format->get_buffer(source->format, buffer, bytes, 
                    &refbuf);
Michael Smith's avatar
Michael Smith committed
510 511 512 513
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
532

Michael Smith's avatar
Michael Smith committed
533 534 535 536 537 538 539 540 541 542 543 544
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

545 546
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
547

548 549 550 551
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
552

553 554 555
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
556

557 558 559 560 561 562 563
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
564

565
                sbytes = source->format->write_buf_to_client(source->format,
566
                        client, &abuf->data[client->pos], bytes);
567
                if (sbytes >= 0) {
568
                    if(sbytes != bytes) {
569 570 571
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
572 573 574 575 576 577
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
578
                else {
Michael Smith's avatar
Michael Smith committed
579 580
                    DEBUG0("Client has unrecoverable error catching up. "
                            "Client has probably disconnected");
581
                    client->con->error = 1;
582
                    data_done = 1;
583
                    refbuf_release(abuf);
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
600
                        client, refbuf->data, refbuf->len);
601
                if (sbytes >= 0) {
602 603
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
604
                        client->pos = sbytes;
605
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
606
                        refbuf_queue_insert(&client->queue, refbuf);
607 608
                    }
                }
609
                else {
Michael Smith's avatar
Michael Smith committed
610 611
                    DEBUG0("Client had unrecoverable error with new data, "
                            "probably due to client disconnection");
612
                    client->con->error = 1;
613 614 615 616 617 618 619 620
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
621
                DEBUG0("Client has fallen too far behind, removing");
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
Michael Smith's avatar
Michael Smith committed
646 647 648
                source->listeners--;
                stats_event_args(source->mount, "listeners", "%d", 
                        source->listeners);
649
                DEBUG0("Client removed");
650 651 652 653 654 655 656 657 658 659 660
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
Michael Smith's avatar
Michael Smith committed
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678
            if(source->max_listeners != -1 && 
                    source->listeners >= source->max_listeners) 
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

                INFO0("Client deleted, exceeding maximum listeners for this "
                        "mountpoint.");
                continue;
            }
            
            /* Otherwise, the client is accepted, add it */
679
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
680 681

            source->listeners++;
682
            DEBUG0("Client added");
683 684
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
Michael Smith's avatar
Michael Smith committed
685 686
            stats_event_args(source->mount, "listeners", "%d", 
                    source->listeners);
687 688 689 690 691 692 693 694 695 696 697 698 699 700

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
701 702 703
            avl_delete(source->pending_tree, 
                    avl_get_first(source->pending_tree)->key, 
                    source_remove_client);
704 705 706 707 708 709 710 711
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
712

Michael Smith's avatar
Michael Smith committed
713 714
done:

715
    INFO1("Source \"%s\" exiting", source->mount);
716

brendan's avatar
brendan committed
717
#ifdef USE_YP
718
    if(source->yp_public) {
719 720
        yp_remove(source);
    }
721 722
#endif
    
Michael Smith's avatar
Michael Smith committed
723 724 725 726
    avl_tree_rlock(global.source_tree);
    fallback_source = source_find_mount(source->fallback_mount);
    avl_tree_unlock(global.source_tree);

727 728 729 730
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
731 732 733
    avl_tree_wlock(global.source_tree);
    avl_delete(global.source_tree, source, source_remove_source);
    avl_tree_unlock(global.source_tree);
734

735 736 737
    /* we need to empty the client and pending trees */
    avl_tree_wlock(source->pending_tree);
    while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
738 739 740
        client_t *client = (client_t *)avl_get_first(
                source->pending_tree)->key;
        if(fallback_source) {
741
            avl_delete(source->pending_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
742

743
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
744 745 746 747 748
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
749
            avl_delete(source->pending_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
750
        }
751 752
    }
    avl_tree_unlock(source->pending_tree);
Michael Smith's avatar
Michael Smith committed
753

754 755
    avl_tree_wlock(source->client_tree);
    while (avl_get_first(source->client_tree)) {
Michael Smith's avatar
Michael Smith committed
756 757 758
        client_t *client = (client_t *)avl_get_first(source->client_tree)->key;

        if(fallback_source) {
759
            avl_delete(source->client_tree, client, source_remove_client);
Michael Smith's avatar
Michael Smith committed
760

761
            /* TODO: reset client local format data?  */
Michael Smith's avatar
Michael Smith committed
762 763 764 765 766
            avl_tree_wlock(fallback_source->pending_tree);
            avl_insert(fallback_source->pending_tree, (void *)client);
            avl_tree_unlock(fallback_source->pending_tree);
        }
        else {
767
            avl_delete(source->client_tree, client, _free_client);
Michael Smith's avatar
Michael Smith committed
768
        }
769 770
    }
    avl_tree_unlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
771

772 773 774
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
775

776 777 778
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
779

Michael Smith's avatar
Michael Smith committed
780 781 782
    if(source->dumpfile)
        fclose(source->dumpfile);

783 784
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
785

786 787
    source_free_source(source);

788
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
789
      
790
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
791 792 793 794
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
795 796 797 798 799
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
800

801 802
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
803

804
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
805 806
}

807
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
808
{
809
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
810 811 812 813
}

static int _free_client(void *key)
{
814 815 816 817 818 819 820 821 822 823
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
824
}
825

826 827
static int _parse_audio_info(source_t *source, char *s)
{
828 829 830 831
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
832 833 834 835 836

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
837
            strncpy(variable, token, pvar-token);    
838
            variable[pvar-token] = 0;
839 840
            pvar++;
            if (strlen(pvar)) {
841
                value = util_url_unescape(pvar);
842 843
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
844 845 846
                if (value) {
                    free(value);
                }
847 848
            }
            if (variable) {
849
                free(variable);
850 851 852 853 854 855
            }
        }
        s = NULL;
    }
    return 1;
}