source.c 26 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
18 19 20 21
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
22
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
23
#include <errno.h>
24 25

#ifndef _WIN32
26
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
27 28
#include <sys/time.h>
#include <sys/socket.h>
29
#else
30 31
#include <winsock2.h>
#include <windows.h>
32
#endif
Jack Moffitt's avatar
Jack Moffitt committed
33

Karl Heyes's avatar
Karl Heyes committed
34 35 36 37
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
38 39 40 41 42 43

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
44
#include "logging.h"
45
#include "cfgfile.h"
46
#include "util.h"
brendan's avatar
brendan committed
47
#ifdef USE_YP
48
#include "geturl.h"
49
#endif
Jack Moffitt's avatar
Jack Moffitt committed
50
#include "source.h"
Michael Smith's avatar
Michael Smith committed
51
#include "format.h"
Michael Smith's avatar
Michael Smith committed
52
#include "auth.h"
Jack Moffitt's avatar
Jack Moffitt committed
53

54 55 56
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
57 58
#define MAX_FALLBACK_DEPTH 10

59 60
mutex_t move_clients_mutex;

Jack Moffitt's avatar
Jack Moffitt committed
61 62 63
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
64
static int _parse_audio_info(source_t *source, char *s);
Jack Moffitt's avatar
Jack Moffitt committed
65

66
source_t *source_create(client_t *client, connection_t *con, 
Michael Smith's avatar
Michael Smith committed
67 68
    http_parser_t *parser, const char *mount, format_type_t type, 
    mount_proxy *mountinfo)
Jack Moffitt's avatar
Jack Moffitt committed
69
{
70
    source_t *src;
Jack Moffitt's avatar
Jack Moffitt committed
71

72
    src = (source_t *)malloc(sizeof(source_t));
73
    src->client = client;
74
    src->mount = (char *)strdup(mount);
Michael Smith's avatar
Michael Smith committed
75
    src->fallback_mount = NULL;
76 77 78 79 80
    src->format = format_get_plugin(type, src->mount, parser);
    src->con = con;
    src->parser = parser;
    src->client_tree = avl_tree_new(_compare_clients, NULL);
    src->pending_tree = avl_tree_new(_compare_clients, NULL);
81
    src->running = 1;
82 83
    src->num_yp_directories = 0;
    src->listeners = 0;
84
    src->max_listeners = -1;
85
    src->send_return = 0;
Michael Smith's avatar
Michael Smith committed
86 87
    src->dumpfilename = NULL;
    src->dumpfile = NULL;
88
    src->audio_info = util_dict_new();
89
    src->yp_public = 0;
Michael Smith's avatar
Michael Smith committed
90 91
    src->fallback_override = 0;
    src->no_mount = 0;
92
    src->authenticator = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
93

Michael Smith's avatar
Michael Smith committed
94
    if(mountinfo != NULL) {
95 96
        if (mountinfo->fallback_mount != NULL)
            src->fallback_mount = strdup (mountinfo->fallback_mount);
Michael Smith's avatar
Michael Smith committed
97
        src->max_listeners = mountinfo->max_listeners;
98 99
        if (mountinfo->dumpfile != NULL)
            src->dumpfilename = strdup (mountinfo->dumpfile);
Michael Smith's avatar
Michael Smith committed
100 101 102 103 104
        if(mountinfo->auth_type != NULL)
            src->authenticator = auth_get_authenticator(
                    mountinfo->auth_type, mountinfo->auth_options);
        src->fallback_override = mountinfo->fallback_override;
        src->no_mount = mountinfo->no_mount;
Michael Smith's avatar
Michael Smith committed
105 106 107 108 109 110 111 112 113 114
    }

    if(src->dumpfilename != NULL) {
        src->dumpfile = fopen(src->dumpfilename, "ab");
        if(src->dumpfile == NULL) {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    src->dumpfilename, strerror(errno));
        }
    }

115
    return src;
Jack Moffitt's avatar
Jack Moffitt committed
116 117
}

Michael Smith's avatar
Michael Smith committed
118 119 120 121
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
122
{
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
146 147
}

148 149

/* Search for mount, if the mount is there but not currently running then
Karl Heyes's avatar
Karl Heyes committed
150
 * check the fallback, and so on.  Must have a global source lock to call
151 152 153
 * this function.
 */
source_t *source_find_mount (const char *mount)
Michael Smith's avatar
Michael Smith committed
154
{
155
    source_t *source = NULL;
Michael Smith's avatar
Michael Smith committed
156
    ice_config_t *config;
157 158 159 160 161 162 163 164 165 166 167 168
    mount_proxy *mountinfo;
    int depth = 0;

    config = config_get_config();
    while (mount != NULL)
    {
        /* limit the number of times through, maybe infinite */
        if (depth > MAX_FALLBACK_DEPTH)
        {
            source = NULL;
            break;
        }
Michael Smith's avatar
Michael Smith committed
169

170 171 172
        source = source_find_mount_raw(mount);
        if (source == NULL)
            break; /* fallback to missing mountpoint */
Michael Smith's avatar
Michael Smith committed
173

174 175
        if (source->running)
            break;
Michael Smith's avatar
Michael Smith committed
176

177 178 179 180 181 182 183
        /* source is not running, meaning that the fallback is not configured
           within the source, we need to check the mount list */
        mountinfo = config->mounts;
        source = NULL;
        while (mountinfo)
        {
            if (strcmp (mountinfo->mountname, mount) == 0)
Michael Smith's avatar
Michael Smith committed
184 185 186
                break;
            mountinfo = mountinfo->next;
        }
187 188
        if (mountinfo)
            mount = mountinfo->fallback_mount;
Michael Smith's avatar
Michael Smith committed
189
        else
190 191
            mount = NULL;
        depth++;
Michael Smith's avatar
Michael Smith committed
192 193
    }

194
    config_release_config();
Michael Smith's avatar
Michael Smith committed
195 196 197 198
    return source;
}


Jack Moffitt's avatar
Jack Moffitt committed
199 200
int source_compare_sources(void *arg, void *a, void *b)
{
201 202
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
203

204
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
205 206 207 208
}

int source_free_source(void *key)
{
209
    source_t *source = key;
brendan's avatar
brendan committed
210 211 212
#ifdef USE_YP
    int i;
#endif
Jack Moffitt's avatar
Jack Moffitt committed
213

214
    free(source->mount);
Michael Smith's avatar
Michael Smith committed
215
    free(source->fallback_mount);
216
    free(source->dumpfilename);
217
    client_destroy(source->client);
218 219 220
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
    source->format->free_plugin(source->format);
brendan's avatar
brendan committed
221
#ifdef USE_YP
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
222 223 224
    for (i=0; i<source->num_yp_directories; i++) {
        yp_destroy_ypdata(source->ypdata[i]);
    }
225
#endif
226
    util_dict_free(source->audio_info);
227
    free(source);
Jack Moffitt's avatar
Jack Moffitt committed
228

229
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
230
}
231 232 233 234

client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
235
    void *result;
236 237 238 239 240 241
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
242
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
243 244 245 246 247 248 249 250
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266

void source_move_clients (source_t *source, source_t *dest)
{
    client_t *client;
    avl_node *node;

    if (source->format->type != dest->format->type)
    {
        WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
        return;
    }
    if (dest->running == 0)
    {
        WARN1 ("source %s not running, unable to move clients ", dest->mount);
        return;
    }
267
    
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
    /* we don't want the two write locks to deadlock in here */
    thread_mutex_lock (&move_clients_mutex);

    /* we need to move the client and pending trees */
    avl_tree_wlock (dest->pending_tree);
    avl_tree_wlock (source->pending_tree);

    while (1)
    {
        node = avl_get_first (source->pending_tree);
        if (node == NULL)
            break;
        client = (client_t *)(node->key);
        avl_delete (source->pending_tree, client, NULL);

        /* TODO: reset client local format data?  */
        avl_insert (dest->pending_tree, (void *)client);
    }
    avl_tree_unlock (source->pending_tree);

    avl_tree_wlock (source->client_tree);
    while (1)
    {
        node = avl_get_first (source->client_tree);
        if (node == NULL)
            break;

        client = (client_t *)(node->key);
        avl_delete (source->client_tree, client, NULL);

        /* TODO: reset client local format data?  */
        avl_insert (dest->pending_tree, (void *)client);
    }
    source->listeners = 0;
    stats_event(source->mount, "listeners", "0");
    avl_tree_unlock (source->client_tree);
    avl_tree_unlock (dest->pending_tree);
    thread_mutex_unlock (&move_clients_mutex);
}

Jack Moffitt's avatar
Jack Moffitt committed
308 309 310

void *source_main(void *arg)
{
311
    source_t *source = (source_t *)arg;
Michael Smith's avatar
Michael Smith committed
312
    source_t *fallback_source;
313 314 315 316 317 318 319 320
    char buffer[4096];
    long bytes, sbytes;
    int ret, timeout;
    client_t *client;
    avl_node *client_node;

    refbuf_t *refbuf, *abuf;
    int data_done;
Jack Moffitt's avatar
Jack Moffitt committed
321

brendan's avatar
brendan committed
322 323 324 325
#ifdef USE_YP
    char *s;
    long current_time;
    int    i;
326
    char *ai;
brendan's avatar
brendan committed
327
#endif
Jack Moffitt's avatar
Jack Moffitt committed
328

Michael Smith's avatar
Michael Smith committed
329 330 331
    long queue_limit;
    ice_config_t *config;
    char *hostname;
332 333
    char *listenurl;
    int listen_url_size;
Michael Smith's avatar
Michael Smith committed
334 335 336 337 338
    int port;

    config = config_get_config();
    
    queue_limit = config->queue_size_limit;
339
    timeout = config->source_timeout;
340
    hostname = strdup(config->hostname);
Michael Smith's avatar
Michael Smith committed
341 342
    port = config->port;

brendan's avatar
brendan committed
343
#ifdef USE_YP
344 345 346 347
    for (i=0;i<config->num_yp_directories;i++) {
        if (config->yp_url[i]) {
            source->ypdata[source->num_yp_directories] = yp_create_ypdata();
            source->ypdata[source->num_yp_directories]->yp_url = 
348
                strdup (config->yp_url[i]);
349
            source->ypdata[source->num_yp_directories]->yp_url_timeout = 
Michael Smith's avatar
Michael Smith committed
350
                config->yp_url_timeout[i];
351 352 353 354
            source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
            source->num_yp_directories++;
        }
    }
355 356
#endif
    
Michael Smith's avatar
Michael Smith committed
357
    config_release_config();
358

359 360
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
361

362 363 364 365
    avl_tree_wlock(global.source_tree);
    /* Now, we must do a final check with write lock taken out that the
     * mountpoint is available..
     */
Michael Smith's avatar
Michael Smith committed
366
    if (source_find_mount_raw(source->mount) != NULL) {
367 368 369 370
        avl_tree_unlock(global.source_tree);
        if(source->send_return) {
            client_send_404(source->client, "Mountpoint in use");
        }
371 372 373 374
        global_lock();
        global.sources--;
        global_unlock();
        thread_rwlock_unlock(source->shutdown_rwlock);
375 376 377
        thread_exit(0);
        return NULL;
    }
378 379 380 381
    /* insert source onto source tree */
    avl_insert(global.source_tree, (void *)source);
    /* release write lock on global source tree */
    avl_tree_unlock(global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
382

383 384 385 386 387 388 389 390 391
    /* If we connected successfully, we can send the message (if requested)
     * back
     */
    if(source->send_return) {
        source->client->respcode = 200;
        bytes = sock_write(source->client->con->sock, 
                "HTTP/1.0 200 OK\r\n\r\n");
        if(bytes > 0) source->client->con->sent_bytes = bytes;
    }
Jack Moffitt's avatar
Jack Moffitt committed
392

393 394
    /* start off the statistics */
    source->listeners = 0;
395 396
    stats_event(source->mount, "listeners", "0");
    stats_event(source->mount, "type", source->format->format_description);
brendan's avatar
brendan committed
397
#ifdef USE_YP
398 399 400 401
    /* ice-* is icecast, icy-* is shoutcast */
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
402
    if ((s = httpp_getvar(source->parser, "ice-name"))) {
403 404 405 406
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
    }
    if ((s = httpp_getvar(source->parser, "icy-name"))) {
        add_yp_info(source, "server_name", s, YP_SERVER_NAME);
407 408
    }
    if ((s = httpp_getvar(source->parser, "ice-url"))) {
409 410 411 412
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
    }
    if ((s = httpp_getvar(source->parser, "icy-url"))) {
        add_yp_info(source, "server_url", s, YP_SERVER_URL);
413 414
    }
    if ((s = httpp_getvar(source->parser, "ice-genre"))) {
415 416 417 418
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
    }
    if ((s = httpp_getvar(source->parser, "icy-genre"))) {
        add_yp_info(source, "genre", s, YP_SERVER_GENRE);
419 420
    }
    if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
421 422 423 424
        add_yp_info(source, "bitrate", s, YP_BITRATE);
    }
    if ((s = httpp_getvar(source->parser, "icy-br"))) {
        add_yp_info(source, "bitrate", s, YP_BITRATE);
425 426
    }
    if ((s = httpp_getvar(source->parser, "ice-description"))) {
427
        add_yp_info(source, "server_description", s, YP_SERVER_DESC);
428
    }
429
    if ((s = httpp_getvar(source->parser, "ice-public"))) {
430
        stats_event(source->mount, "public", s);
431 432 433 434 435
        source->yp_public = atoi(s);
    }
    if ((s = httpp_getvar(source->parser, "icy-pub"))) {
        stats_event(source->mount, "public", s);
        source->yp_public = atoi(s);
436 437
    }
    if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
438
        stats_event(source->mount, "audio_info", s);
439 440
        if (_parse_audio_info(source, s)) {
            ai = util_dict_urlencode(source->audio_info, '&');
441
            add_yp_info(source, "audio_info", 
442 443
                    ai,
                    YP_AUDIO_INFO);
444 445 446
            if (ai) {
                free(ai);
            }
447
        }
448 449
    }
    for (i=0;i<source->num_yp_directories;i++) {
450
        add_yp_info(source, "server_type", 
451 452
                     source->format->format_description,
                     YP_SERVER_TYPE);
453 454 455 456 457
        if (source->ypdata[i]->listen_url) {
            free(source->ypdata[i]->listen_url);
        }
        /* 6 for max size of port */
        listen_url_size = strlen("http://") + 
Michael Smith's avatar
Michael Smith committed
458
            strlen(hostname) + 
459
            strlen(":") + 6 + strlen(source->mount) + 1;
460 461
        source->ypdata[i]->listen_url = malloc(listen_url_size);
        sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s", 
Michael Smith's avatar
Michael Smith committed
462
                hostname, port, source->mount);
463
    }
464

465
    if(source->yp_public) {
466

467
        current_time = time(NULL);
468

469
        for (i=0;i<source->num_yp_directories;i++) {
470 471
            /* Give the source 5 seconds to update the metadata
               before we do our first touch */
472
            /* Don't permit touch intervals of less than 30 seconds */
473 474 475
            if (source->ypdata[i]->yp_touch_interval <= 30) {
                source->ypdata[i]->yp_touch_interval = 30;
            }
476
            source->ypdata[i]->yp_last_touch = 0;
477
        }
478
    }
479
#endif
480 481 482 483 484 485 486 487 488 489 490 491 492 493
    /* 6 for max size of port */
    listen_url_size = strlen("http://") + 
    strlen(hostname) + strlen(":") + 6 + strlen(source->mount) + 1;
    
    listenurl = malloc(listen_url_size);
    memset(listenurl, '\000', listen_url_size);
    sprintf(listenurl, "http://%s:%d%s", hostname, port, source->mount);
    stats_event(source->mount, "listenurl", listenurl);
    if (hostname) {
        free(hostname);
    }
    if (listenurl) {
        free(listenurl);
    }
494

495 496
    DEBUG0("Source creation complete");

Michael Smith's avatar
Michael Smith committed
497 498 499 500 501 502 503 504 505 506 507
    /*
    ** Now, if we have a fallback source and override is on, we want
    ** to steal it's clients, because it means we've come back online
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

    if(source->fallback_override && source->fallback_mount) {
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);

508 509
        if (fallback_source)
            source_move_clients (fallback_source, source);
Michael Smith's avatar
Michael Smith committed
510

511
        avl_tree_unlock(global.source_tree);
Michael Smith's avatar
Michael Smith committed
512 513
    }

514 515
    while (global.running == ICE_RUNNING && source->running) {
        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
Michael Smith's avatar
Michael Smith committed
516 517 518 519
        if(ret < 0) {
            WARN0("Bad data from source");
            break;
        }
520
        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
521 522 523
        while (refbuf == NULL) {
            bytes = 0;
            while (bytes <= 0) {
524 525
                ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);

526 527
                if (ret < 0 && sock_recoverable (sock_error()))
                   continue;
528
                if (ret <= 0) { /* timeout expired */
529 530
                    WARN1("Disconnecting source: socket timeout (%d s) expired",
                           timeout);
531 532 533
                    bytes = 0;
                    break;
                }
Jack Moffitt's avatar
Jack Moffitt committed
534

535
                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
Michael Smith's avatar
Michael Smith committed
536 537 538
                if (bytes == 0 || 
                        (bytes < 0 && !sock_recoverable(sock_error()))) 
                {
539 540
                    DEBUG1("Disconnecting source due to socket read error: %s",
                            strerror(sock_error()));
541
                    break;
542
                }
543 544
            }
            if (bytes <= 0) break;
545
            source->client->con->sent_bytes += bytes;
Michael Smith's avatar
Michael Smith committed
546 547
            ret = source->format->get_buffer(source->format, buffer, bytes, 
                    &refbuf);
Michael Smith's avatar
Michael Smith committed
548 549 550 551
            if(ret < 0) {
                WARN0("Bad data from source");
                goto done;
            }
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
        }

        if (bytes <= 0) {
            INFO0("Removing source following disconnection");
            break;
        }

        /* we have a refbuf buffer, which a data block to be sent to 
        ** all clients.  if a client is not able to send the buffer
        ** immediately, it should store it on its queue for the next
        ** go around.
        **
        ** instead of sending the current block, a client should send
        ** all data in the queue, plus the current block, until either
        ** it runs out of data, or it hits a recoverable error like
        ** EAGAIN.  this will allow a client that got slightly lagged
        ** to catch back up if it can
        */
Jack Moffitt's avatar
Jack Moffitt committed
570

Michael Smith's avatar
Michael Smith committed
571 572 573 574 575 576 577 578 579 580 581 582
        /* First, stream dumping, if enabled */
        if(source->dumpfile) {
            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
                    refbuf->len) 
            {
                WARN1("Write to dump file failed, disabling: %s", 
                        strerror(errno));
                fclose(source->dumpfile);
                source->dumpfile = NULL;
            }
        }

583 584
        /* acquire read lock on client_tree */
        avl_tree_rlock(source->client_tree);
Jack Moffitt's avatar
Jack Moffitt committed
585

586 587 588 589
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            /* acquire read lock on node */
            avl_node_wlock(client_node);
Jack Moffitt's avatar
Jack Moffitt committed
590

591 592 593
            client = (client_t *)client_node->key;
            
            data_done = 0;
Jack Moffitt's avatar
Jack Moffitt committed
594

595 596 597 598 599 600 601
            /* do we have any old buffers? */
            abuf = refbuf_queue_remove(&client->queue);
            while (abuf) {
                if (client->pos > 0)
                    bytes = abuf->len - client->pos;
                else
                    bytes = abuf->len;
Jack Moffitt's avatar
Jack Moffitt committed
602

603
                sbytes = source->format->write_buf_to_client(source->format,
604
                        client, &abuf->data[client->pos], bytes);
605
                if (sbytes >= 0) {
606
                    if(sbytes != bytes) {
607 608 609
                        /* We didn't send the entire buffer. Leave it for
                         * the moment, handle it in the next iteration.
                         */
610 611 612 613 614 615
                        client->pos += sbytes;
                        refbuf_queue_insert(&client->queue, abuf);
                        data_done = 1;
                        break;
                    }
                }
616
                else {
Michael Smith's avatar
Michael Smith committed
617 618
                    DEBUG0("Client has unrecoverable error catching up. "
                            "Client has probably disconnected");
619
                    client->con->error = 1;
620
                    data_done = 1;
621
                    refbuf_release(abuf);
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
                    break;
                }
                
                /* we're done with that refbuf, release it and reset the pos */
                refbuf_release(abuf);
                client->pos = 0;

                abuf = refbuf_queue_remove(&client->queue);
            }
            
            /* now send or queue the new data */
            if (data_done) {
                refbuf_addref(refbuf);
                refbuf_queue_add(&client->queue, refbuf);
            } else {
                sbytes = source->format->write_buf_to_client(source->format,
638
                        client, refbuf->data, refbuf->len);
639
                if (sbytes >= 0) {
640 641
                    if(sbytes != refbuf->len) {
                        /* Didn't send the entire buffer, queue it */
642
                        client->pos = sbytes;
643
                        refbuf_addref(refbuf);
Michael Smith's avatar
Michael Smith committed
644
                        refbuf_queue_insert(&client->queue, refbuf);
645 646
                    }
                }
647
                else {
Michael Smith's avatar
Michael Smith committed
648 649
                    DEBUG0("Client had unrecoverable error with new data, "
                            "probably due to client disconnection");
650
                    client->con->error = 1;
651 652 653 654 655 656 657 658
                }
            }

            /* if the client is too slow, its queue will slowly build up.
            ** we need to make sure the client is keeping up with the
            ** data, so we'll kick any client who's queue gets to large.
            */
            if (refbuf_queue_length(&client->queue) > queue_limit) {
659
                DEBUG0("Client has fallen too far behind, removing");
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
                client->con->error = 1;
            }

            /* release read lock on node */
            avl_node_unlock(client_node);

            /* get the next node */
            client_node = avl_get_next(client_node);
        }
        /* release read lock on client_tree */
        avl_tree_unlock(source->client_tree);

        refbuf_release(refbuf);

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        /** delete bad clients **/
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
Michael Smith's avatar
Michael Smith committed
684 685 686
                source->listeners--;
                stats_event_args(source->mount, "listeners", "%d", 
                        source->listeners);
687
                DEBUG0("Client removed");
688 689 690 691 692 693 694 695 696 697 698
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
Michael Smith's avatar
Michael Smith committed
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716
            if(source->max_listeners != -1 && 
                    source->listeners >= source->max_listeners) 
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

                INFO0("Client deleted, exceeding maximum listeners for this "
                        "mountpoint.");
                continue;
            }
            
            /* Otherwise, the client is accepted, add it */
717
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
718 719

            source->listeners++;
720
            DEBUG0("Client added");
721 722
            stats_event_inc(NULL, "clients");
            stats_event_inc(source->mount, "connections");
Michael Smith's avatar
Michael Smith committed
723 724
            stats_event_args(source->mount, "listeners", "%d", 
                    source->listeners);
725 726 727 728 729 730 731 732 733 734 735 736 737 738

            /* we have to send cached headers for some data formats
            ** this is where we queue up the buffers to send
            */
            if (source->format->has_predata) {
                client = (client_t *)client_node->key;
                client->queue = source->format->get_predata(source->format);
            }

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
739 740 741
            avl_delete(source->pending_tree, 
                    avl_get_first(source->pending_tree)->key, 
                    source_remove_client);
742 743 744 745 746 747 748 749
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
750

Michael Smith's avatar
Michael Smith committed
751 752
done:

753
    INFO1("Source \"%s\" exiting", source->mount);
754

brendan's avatar
brendan committed
755
#ifdef USE_YP
756
    if(source->yp_public) {
757 758
        yp_remove(source);
    }
759 760
#endif
    
761 762 763 764
    /* Now, we must remove this source from the source tree before
     * removing the clients, otherwise new clients can sneak into the pending
     * tree after we've cleared it
     */
765
    avl_tree_wlock(global.source_tree);
766 767
    fallback_source = source_find_mount (source->fallback_mount);
    avl_delete (global.source_tree, source, NULL);
Michael Smith's avatar
Michael Smith committed
768

769 770
    if (fallback_source != NULL)
        source_move_clients (source, fallback_source);
Michael Smith's avatar
Michael Smith committed
771

772
    avl_tree_unlock (global.source_tree);
Jack Moffitt's avatar
Jack Moffitt committed
773

774 775 776
    /* delete this sources stats */
    stats_event_dec(NULL, "sources");
    stats_event(source->mount, "listeners", NULL);
Jack Moffitt's avatar
Jack Moffitt committed
777

778 779 780
    global_lock();
    global.sources--;
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
781

Michael Smith's avatar
Michael Smith committed
782 783 784
    if(source->dumpfile)
        fclose(source->dumpfile);

785 786
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
787

788 789
    source_free_source(source);

790
    thread_exit(0);
Jack Moffitt's avatar
Jack Moffitt committed
791
      
792
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
793 794 795 796
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
797 798 799 800 801
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
802

803 804
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
805

806
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
807 808
}

809
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
810
{
811
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
812 813 814 815
}

static int _free_client(void *key)
{
816 817 818 819 820 821 822 823 824 825
    client_t *client = (client_t *)key;

    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");
    
    client_destroy(client);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
826
}
827

828 829
static int _parse_audio_info(source_t *source, char *s)
{
830 831 832 833
    char *token = NULL;
    char *pvar = NULL;
    char *variable = NULL;
    char *value = NULL;
834 835 836 837 838

    while ((token = strtok(s,";")) != NULL) {
        pvar = strchr(token, '=');
        if (pvar) {
            variable = (char *)malloc(pvar-token+1);
839
            strncpy(variable, token, pvar-token);    
840
            variable[pvar-token] = 0;
841 842
            pvar++;
            if (strlen(pvar)) {
843
                value = util_url_unescape(pvar);
844 845
                util_dict_set(source->audio_info, variable, value);
                stats_event(source->mount, variable, value);
846 847 848
                if (value) {
                    free(value);
                }
849 850
            }
            if (variable) {
851
                free(variable);
852 853 854 855 856 857
            }
        }
        s = NULL;
    }
    return 1;
}