source.c 26.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
18 19 20 21
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
22
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
23
#include <errno.h>
24 25

#ifndef _WIN32
26
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
27 28
#include <sys/time.h>
#include <sys/socket.h>
29
#else
30 31
#include <winsock2.h>
#include <windows.h>
32
#endif
Jack Moffitt's avatar
Jack Moffitt committed
33

Karl Heyes's avatar
Karl Heyes committed
34 35 36 37
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
38 39 40 41 42 43

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
44
#include "logging.h"
45
#include "cfgfile.h"
46
#include "util.h"
brendan's avatar
brendan committed
47
#ifdef USE_YP
48
#include "geturl.h"
49
#endif
Jack Moffitt's avatar
Jack Moffitt committed
50
#include "source.h"
Michael Smith's avatar
Michael Smith committed
51
#include "format.h"
Michael Smith's avatar
Michael Smith committed
52
#include "auth.h"
Jack Moffitt's avatar
Jack Moffitt committed
53

54 55 56
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
57 58
#define MAX_FALLBACK_DEPTH 10

59 60
mutex_t move_clients_mutex;

Jack Moffitt's avatar
Jack Moffitt committed
61 62 63
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
64
static int _parse_audio_info(source_t *source, char *s);
Jack Moffitt's avatar
Jack Moffitt committed
65

66
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
67 68
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
69
{
70
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
71

72
    src = (source_t *)malloc(sizeof(source_t));
73
    src->client = client;
74
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
75
    src->fallback_mount = NULL;
76 77 78 79 80
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
81
    src->running = 1;
82 83
    src->num_yp_directories = 0;
    src->listeners = 0;
84
    src->max_listeners = -1;
85
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
86 87
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
88
    src->audio_info = util_dict_new();
89
    src->yp_public = 0;
Michael Smith's avatar
Michael Smith committed
90 91
    src->fallback_override = 0;
    src->no_mount = 0;
92
    src->authenticator = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
93

Michael Smith's avatar
Michael Smith committed
94
    if(mountinfo != NULL) {
95 96
        if (mountinfo->fallback_mount != NULL)
            src->fallback_mount = strdup (mountinfo->fallback_mount);
Michael Smith's avatar
Michael Smith committed
97
        src->max_listeners = mountinfo->max_listeners;
98 99
        if (mountinfo->dumpfile != NULL)
            src->dumpfilename = strdup (mountinfo->dumpfile);
Michael Smith's avatar
Michael Smith committed
100 101 102 103 104
        if(mountinfo->auth_type != NULL)
            src->authenticator = auth_get_authenticator(
                    mountinfo->auth_type, mountinfo->auth_options);
        src->fallback_override = mountinfo->fallback_override;
        src->no_mount = mountinfo->no_mount;
Michael Smith's avatar
Michael Smith committed
105 106 107 108 109 110 111 112 113 114
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

115
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
116 117
}

Michael Smith's avatar
Michael Smith committed
118 119 120 121
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
122
{
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
146 147
}

148 149

/* Search for mount, if the mount is there but not currently running then
Karl Heyes's avatar
Karl Heyes committed
150
 * check the fallback, and so on.  Must have a global source lock to call
151 152 153
 * this function.
 */
source_t *source_find_mount (const char *mount)
Michael Smith's avatar
Michael Smith committed
154
{
155
    source_t *source = NULL;
Michael Smith's avatar
Michael Smith committed
156
    ice_config_t *config;
157 158 159 160 161 162 163 164 165 166 167 168
    mount_proxy *mountinfo;
    int depth = 0;

    config = config_get_config();
    while (mount != NULL)
    {
        /* limit the number of times through, maybe infinite */
        if (depth > MAX_FALLBACK_DEPTH)
        {
            source = NULL;
            break;
        }
Michael Smith's avatar
Michael Smith committed
169

170 171 172
        source = source_find_mount_raw(mount);
        if (source == NULL)
            break; /* fallback to missing mountpoint */
Michael Smith's avatar
Michael Smith committed
173

174 175
        if (source->running)
            break;
Michael Smith's avatar
Michael Smith committed
176

177 178 179 180 181 182 183
        /* source is not running, meaning that the fallback is not configured
           within the source, we need to check the mount list */
        mountinfo = config->mounts;
        source = NULL;
        while (mountinfo)
        {
            if (strcmp (mountinfo->mountname, mount) == 0)
Michael Smith's avatar
Michael Smith committed
184 185 186
                break;
            mountinfo = mountinfo->next;
        }
187 188
        if (mountinfo)
            mount = mountinfo->fallback_mount;
Michael Smith's avatar
Michael Smith committed
189
        else
190 191
            mount = NULL;
        depth++;
Michael Smith's avatar
Michael Smith committed
192 193
    }

194
    config_release_config();
Michael Smith's avatar
Michael Smith committed
195 196 197 198
    return source;
}


Jack Moffitt's avatar
Jack Moffitt committed
199 200
int source_compare_sources(void *arg, void *a, void *b)
{
201 202
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
203

204
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
205 206 207 208
}

int source_free_source(void *key)
{
209
    source_t *source = key;
brendan's avatar
brendan committed
210 211 212
#ifdef USE_YP
    int i;
#endif
Jack Moffitt's avatar
Jack Moffitt committed
213

214
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
215
    free(source->fallback_mount);
216
    free(source->dumpfilename);
217
    client_destroy(source->client);
218 219 220
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
brendan's avatar
brendan committed
221
#ifdef USE_YP
Karl Heyes's avatar
Karl Heyes committed
222 223
    for (i=0; i<source->num_yp_directories; i++)
    {
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
224
        yp_destroy_ypdata(source->ypdata[i]);
Karl Heyes's avatar
Karl Heyes committed
225
        source->ypdata[i] = NULL;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
226
    }
227
#endif
228
    util_dict_free(source->audio_info);
229
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
230

231
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
232
}
233 234 235 236

client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
237
    void *result;
238 239 240 241 242 243
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
244
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
245 246 247 248 249 250 251 252
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268

void source_move_clients (source_t *source, source_t *dest)
{
    client_t *client;
    avl_node *node;

    if (source->format->type != dest->format->type)
    {
        WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
        return;
    }
    if (dest->running == 0)
    {
        WARN1 ("source %s not running, unable to move clients ", dest->mount);
        return;
    }
269
    
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
    /* we don't want the two write locks to deadlock in here */
    thread_mutex_lock (&move_clients_mutex);

    /* we need to move the client and pending trees */
    avl_tree_wlock (dest->pending_tree);
    avl_tree_wlock (source->pending_tree);

    while (1)
    {
        node = avl_get_first (source->pending_tree);
        if (node == NULL)
            break;
        client = (client_t *)(node->key);
        avl_delete (source->pending_tree, client, NULL);

        /* TODO: reset client local format data?  */
        avl_insert (dest->pending_tree, (void *)client);
    }
    avl_tree_unlock (source->pending_tree);

    avl_tree_wlock (source->client_tree);
    while (1)
    {
        node = avl_get_first (source->client_tree);
        if (node == NULL)
            break;

        client = (client_t *)(node->key);
        avl_delete (source->client_tree, client, NULL);

        /* TODO: reset client local format data?  */
        avl_insert (dest->pending_tree, (void *)client);
    }
    source->listeners = 0;
    stats_event(source->mount, "listeners", "0");
    avl_tree_unlock (source->client_tree);
    avl_tree_unlock (dest->pending_tree);
    thread_mutex_unlock (&move_clients_mutex);
}

Jack Moffitt's avatar
Jack Moffitt committed
310 311 312

void *source_main(void *arg)
{
313
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
314
    source_t *fallback_source;
315 316 317 318 319 320 321 322
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
323

brendan's avatar
brendan committed
324 325 326 327
#ifdef USE_YP
    char *s;
    long current_time;
    int    i;
328
    char *ai;
brendan's avatar
brendan committed
329
#endif
Jack Moffitt's avatar
Jack Moffitt committed
330

Michael Smith's avatar
Michael Smith committed
331 332 333
    long queue_limit;
    ice_config_t *config;
    char *hostname;
334 335
    char *listenurl;
    int listen_url_size;
Michael Smith's avatar
Michael Smith committed
336 337 338 339 340
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
341
    timeout = config->source_timeout;
342
    hostname = strdup(config->hostname);
Michael Smith's avatar
Michael Smith committed
343 344
    port = config->port;

brendan's avatar
brendan committed
345
#ifdef USE_YP
346 347 348 349
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
350
                strdup (config->yp_url[i]);
351
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
352
                config->yp_url_timeout[i];
353 354 355 356
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
357 358
#endif
    
Michael Smith's avatar
Michael Smith committed
359
    config_release_config();
360

361 362
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
363

364 365 366 367
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
Michael Smith's avatar
Michael Smith committed
368
    if (source_find_mount_raw(source->mount) != NULL) {
369 370 371 372
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
373 374 375 376
        global_lock();
        global.sources--;
        global_unlock();
        thread_rwlock_unlock(source->shutdown_rwlock);
377 378 379
        thread_exit(0);
        return NULL;
    }
380 381 382 383
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
384

385 386 387 388 389 390 391 392 393
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
394

395 396
    /* start off the statistics */
    source->listeners = 0;
397 398
    stats_event(source->mount, "listeners", "0");
    stats_event(source->mount, "type", source->format->format_description);
brendan's avatar
brendan committed
399
#ifdef USE_YP
400 401 402 403
    /* ice-* is icecast, icy-* is shoutcast */
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
404
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
405 406 407 408
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
    }
    if ((s = httpp_getvar(source->parser, "icy-name"))) {
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
409 410
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
411 412 413 414
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
    if ((s = httpp_getvar(source->parser, "icy-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
415 416
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
417 418 419 420
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
    }
    if ((s = httpp_getvar(source->parser, "icy-genre"))) {
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
421 422
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
423 424 425 426
        add_yp_info(source, "bitrate", s, YP_BITRATE);
    }
    if ((s = httpp_getvar(source->parser, "icy-br"))) {
        add_yp_info(source, "bitrate", s, YP_BITRATE);
427 428
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
429
        add_yp_info(source, "server_description", s, YP_SERVER_DESC);
430
    }
431
    if ((s = httpp_getvar(source->parser, "ice-public"))) {
432
        stats_event(source->mount, "public", s);
433 434 435 436 437
        source->yp_public = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "icy-pub"))) {
        stats_event(source->mount, "public", s);
        source->yp_public = atoi(s);
438 439
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
440
        stats_event(source->mount, "audio_info", s);
441 442
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
443
            add_yp_info(source, "audio_info", 
444 445
                    ai,
                    YP_AUDIO_INFO);
446 447 448
            if (ai) {
                free(ai);
            }
449
        }
450 451
    }
    for (i=0;i<source->num_yp_directories;i++) {
452
        add_yp_info(source, "server_type", 
453 454
                     source->format->format_description,
                     YP_SERVER_TYPE);
455 456 457 458 459
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
460
            strlen(hostname) + 
461
            strlen(":") + 6 + strlen(source->mount) + 1;
462 463
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
464
                hostname, port, source->mount);
465
    }
466

467
    if(source->yp_public) {
468

469
        current_time = time(NULL);
470

471
        for (i=0;i<source->num_yp_directories;i++) {
472 473
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
474
            /* Don't permit touch intervals of less than 30 seconds */
475 476 477
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
478
            source->ypdata[i]->yp_last_touch = 0;
479
        }
480
    }
481
#endif
482 483 484 485 486 487 488 489 490 491 492 493 494 495
    /* 6 for max size of port */
    listen_url_size = strlen("http://") + 
    strlen(hostname) + strlen(":") + 6 + strlen(source->mount) + 1;
    
    listenurl = malloc(listen_url_size);
    memset(listenurl, '\000', listen_url_size);
    sprintf(listenurl, "http://%s:%d%s", hostname, port, source->mount);
    stats_event(source->mount, "listenurl", listenurl);
    if (hostname) {
        free(hostname);
    }
    if (listenurl) {
        free(listenurl);
    }
496

497 498
    DEBUG0("Source creation complete");

Michael Smith's avatar
Michael Smith committed
499 500 501 502 503 504 505 506 507 508 509
    /*
    ** Now, if we have a fallback source and override is on, we want
    ** to steal it's clients, because it means we've come back online
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

    if(source->fallback_override && source->fallback_mount) {
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);

510 511
        if (fallback_source)
            source_move_clients (fallback_source, source);
Michael Smith's avatar
Michael Smith committed
512

513
        avl_tree_unlock(global.source_tree);
Michael Smith's avatar
Michael Smith committed
514 515
    }

516 517
    while (global.running == ICE_RUNNING && source->running) {
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
518 519 520 521
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
522
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
523 524 525
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
526 527
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

528 529
                if (ret < 0 && sock_recoverable (sock_error()))
                   continue;
530
                if (ret <= 0) { /* timeout expired */
531 532
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
533 534 535
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
536

537
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
Michael Smith's avatar
Michael Smith committed
538 539 540
                if (bytes == 0 || 
                        (bytes < 0 && !sock_recoverable(sock_error()))) 
                {
541 542
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
543
                    break;
544
                }
545 546
            }
            if (bytes <= 0) break;
547
            source->client->con->sent_bytes += bytes;
Michael Smith's avatar
Michael Smith committed
548 549
            ret = source->format->get_buffer(source->format, buffer, bytes, 
                    &refbuf);
Michael Smith's avatar
Michael Smith committed
550 551 552 553
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
572

Michael Smith's avatar
Michael Smith committed
573 574 575 576 577 578 579 580 581 582 583 584
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

585 586
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
587

588 589 590 591
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
592

593 594 595
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
596

597 598 599 600 601 602 603
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
604

605
                sbytes = source->format->write_buf_to_client(source->format,
606
                        client, &abuf->data[client->pos], bytes);
607
                if (sbytes >= 0) {
608
                    if(sbytes != bytes) {
609 610 611
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
612 613 614 615 616 617
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
618
                else {
Michael Smith's avatar
Michael Smith committed
619 620
                    DEBUG0("Client has unrecoverable error catching up. "
                            "Client has probably disconnected");
621
                    client->con->error = 1;
622
                    data_done = 1;
623
                    refbuf_release(abuf);
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
640
                        client, refbuf->data, refbuf->len);
641
                if (sbytes >= 0) {
642 643
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
644
                        client->pos = sbytes;
645
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
646
                        refbuf_queue_insert(&client->queue, refbuf);
647 648
                    }
                }
649
                else {
Michael Smith's avatar
Michael Smith committed
650 651
                    DEBUG0("Client had unrecoverable error with new data, "
                            "probably due to client disconnection");
652
                    client->con->error = 1;
653 654 655 656 657 658 659 660
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
661
                DEBUG0("Client has fallen too far behind, removing");
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
Michael Smith's avatar
Michael Smith committed
686 687 688
                source->listeners--;
                stats_event_args(source->mount, "listeners", "%d", 
                        source->listeners);
689
                DEBUG0("Client removed");
690 691 692 693 694 695 696 697 698 699 700
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
Michael Smith's avatar
Michael Smith committed
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
            if(source->max_listeners != -1 && 
                    source->listeners >= source->max_listeners) 
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

                INFO0("Client deleted, exceeding maximum listeners for this "
                        "mountpoint.");
                continue;
            }
            
            /* Otherwise, the client is accepted, add it */
719
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
720 721

            source->listeners++;
722
            DEBUG0("Client added");
723 724
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
Michael Smith's avatar
Michael Smith committed
725 726
            stats_event_args(source->mount, "listeners", "%d", 
                    source->listeners);
727 728 729 730 731 732 733 734 735 736 737 738 739 740

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
741 742 743
            avl_delete(source->pending_tree, 
                    avl_get_first(source->pending_tree)->key, 
                    source_remove_client);
744 745 746 747 748 749 750 751
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
752

Michael Smith's avatar
Michael Smith committed
753 754
done:

755
    INFO1("Source \"%s\" exiting", source->mount);
756

brendan's avatar
brendan committed
757
#ifdef USE_YP
758
    if(source->yp_public) {
759 760
        yp_remove(source);
    }
761 762
#endif
    
763 764 765 766
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
767
    avl_tree_wlock(global.source_tree);
768 769
    fallback_source = source_find_mount (source->fallback_mount);
    avl_delete (global.source_tree, source, NULL);
Michael Smith's avatar
Michael Smith committed
770

771 772
    if (fallback_source != NULL)
        source_move_clients (source, fallback_source);
Michael Smith's avatar
Michael Smith committed
773

774
    avl_tree_unlock (global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
775

776 777 778
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
779

780 781 782
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
783

Michael Smith's avatar
Michael Smith committed
784 785 786
    if(source->dumpfile)
        fclose(source->dumpfile);

787 788
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
789

790 791
    source_free_source(source);

792
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
793
      
794
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
795 796 797 798
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
799 800 801 802 803
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
804

805 806
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
807

808
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
809 810
}

811
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
812
{
813
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
814 815 816 817
}

static int _free_client(void *key)
{
818 819 820 821 822 823 824 825 826 827
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
828
}
829

830 831
static int _parse_audio_info(source_t *source, char *s)
{
832 833 834 835
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
836 837 838 839 840

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
841
            strncpy(variable, token, pvar-token);    
842
            variable[pvar-token] = 0;
843 844
            pvar++;
            if (strlen(pvar)) {
845
                value = util_url_unescape(pvar);
846 847
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
848 849 850
                if (value) {
                    free(value);
                }
851 852
            }
            if (variable) {
853
                free(variable);
854 855 856 857 858 859
            }
        }
        s = NULL;
    }
    return 1;
}