source.c 44.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
18 19 20 21
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
22
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
23
#include <errno.h>
24 25

#ifndef _WIN32
26
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
27
#include <sys/time.h>
28
#include <sys/socket.h>
29
#include <sys/wait.h>
30
#include <limits.h>
31
#else
32 33
#include <winsock2.h>
#include <windows.h>
34
#define snprintf _snprintf
35
#endif
Jack Moffitt's avatar
Jack Moffitt committed
36

37 38 39 40 41 42
#ifndef _WIN32
/* for __setup_empty_script_environment() */
#include <sys/stat.h>
#include <fcntl.h>
#endif

Karl Heyes's avatar
Karl Heyes committed
43 44 45 46
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
47 48 49 50 51 52

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
53
#include "logging.h"
54
#include "cfgfile.h"
55
#include "util.h"
Jack Moffitt's avatar
Jack Moffitt committed
56
#include "source.h"
Michael Smith's avatar
Michael Smith committed
57
#include "format.h"
58
#include "fserve.h"
Michael Smith's avatar
Michael Smith committed
59
#include "auth.h"
60
#include "compat.h"
Jack Moffitt's avatar
Jack Moffitt committed
61

62 63 64
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
65 66
#define MAX_FALLBACK_DEPTH 10

67 68
mutex_t move_clients_mutex;

Jack Moffitt's avatar
Jack Moffitt committed
69 70 71
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
72
static void _parse_audio_info (source_t *source, const char *s);
Karl Heyes's avatar
Karl Heyes committed
73
static void source_shutdown (source_t *source);
74
#ifdef _WIN32
75
#define source_run_script(x,y)  ICECAST_LOG_WARN("on [dis]connect scripts disabled");
76 77 78
#else
static void source_run_script (char *command, char *mountpoint);
#endif
Jack Moffitt's avatar
Jack Moffitt committed
79

80 81 82 83 84 85 86 87
/* Allocate a new source with the stated mountpoint, if one already
 * exists with that mountpoint in the global source tree then return
 * NULL.
 */
source_t *source_reserve (const char *mount)
{
    source_t *src = NULL;

88
    if(mount[0] != '/')
89
        ICECAST_LOG_WARN("Source at \"%s\" does not start with '/', clients will be "
90 91
                "unable to connect", mount);

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
    do
    {
        avl_tree_wlock (global.source_tree);
        src = source_find_mount_raw (mount);
        if (src)
        {
            src = NULL;
            break;
        }

        src = calloc (1, sizeof(source_t));
        if (src == NULL)
            break;

        src->client_tree = avl_tree_new(_compare_clients, NULL);
        src->pending_tree = avl_tree_new(_compare_clients, NULL);

        /* make duplicates for strings or similar */
        src->mount = strdup (mount);
        src->max_listeners = -1;
112
        thread_mutex_create(&src->lock);
113 114 115 116 117 118 119 120 121 122

        avl_insert (global.source_tree, src);

    } while (0);

    avl_tree_unlock (global.source_tree);
    return src;
}


Michael Smith's avatar
Michael Smith committed
123 124 125 126
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
127
{
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
151 152
}

153 154

/* Search for mount, if the mount is there but not currently running then
Karl Heyes's avatar
Karl Heyes committed
155
 * check the fallback, and so on.  Must have a global source lock to call
156 157 158
 * this function.
 */
source_t *source_find_mount (const char *mount)
Michael Smith's avatar
Michael Smith committed
159
{
160
    source_t *source = NULL;
Michael Smith's avatar
Michael Smith committed
161
    ice_config_t *config;
162 163 164 165
    mount_proxy *mountinfo;
    int depth = 0;

    config = config_get_config();
166
    while (mount && depth < MAX_FALLBACK_DEPTH)
167 168
    {
        source = source_find_mount_raw(mount);
Michael Smith's avatar
Michael Smith committed
169

170 171 172 173 174
        if (source)
        {
            if (source->running || source->on_demand)
                break;
        }
Michael Smith's avatar
Michael Smith committed
175

176 177 178
        /* we either have a source which is not active (relay) or no source
         * at all. Check the mounts list for fallback settings
         */
179
        mountinfo = config_find_mount (config, mount, MOUNT_TYPE_NORMAL);
180
        source = NULL;
181 182 183 184

        if (mountinfo == NULL)
            break;
        mount = mountinfo->fallback_mount;
185
        depth++;
Michael Smith's avatar
Michael Smith committed
186 187
    }

188
    config_release_config();
Michael Smith's avatar
Michael Smith committed
189 190 191 192
    return source;
}


Jack Moffitt's avatar
Jack Moffitt committed
193 194
int source_compare_sources(void *arg, void *a, void *b)
{
195 196
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
197

198
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
199 200
}

201 202 203

void source_clear_source (source_t *source)
{
204 205
    int c;

206
    ICECAST_LOG_DEBUG("clearing source \"%s\"", source->mount);
207

208
    avl_tree_wlock (source->pending_tree);
209 210
    client_destroy(source->client);
    source->client = NULL;
211 212
    source->parser = NULL;
    source->con = NULL;
213

214 215 216 217
    /* log bytes read in access log */
    if (source->client && source->format)
        source->client->con->sent_bytes = source->format->read_bytes;

218 219
    if (source->dumpfile)
    {
220
        ICECAST_LOG_INFO("Closing dumpfile for %s", source->mount);
221 222 223 224
        fclose (source->dumpfile);
        source->dumpfile = NULL;
    }

225
    /* lets kick off any clients that are left on here */
226
    avl_tree_wlock (source->client_tree);
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
    c=0;
    while (1)
    {
        avl_node *node = avl_get_first (source->client_tree);
        if (node)
        {
            client_t *client = node->key;
            if (client->respcode == 200)
                c++; /* only count clients that have had some processing */
            avl_delete (source->client_tree, client, _free_client);
            continue;
        }
        break;
    }
    if (c)
242
    {
243
        stats_event_sub (NULL, "listeners", source->listeners);
244
        ICECAST_LOG_INFO("%d active listeners on %s released", c, source->mount);
245
    }
246
    avl_tree_unlock (source->client_tree);
247 248 249 250 251 252 253 254 255 256

    while (avl_get_first (source->pending_tree))
    {
        avl_delete (source->pending_tree,
                avl_get_first(source->pending_tree)->key, _free_client);
    }

    if (source->format && source->format->free_plugin)
        source->format->free_plugin (source->format);
    source->format = NULL;
257 258 259 260 261 262

    /* Lets clear out the source queue too */
    while (source->stream_data)
    {
        refbuf_t *p = source->stream_data;
        source->stream_data = p->next;
263
        p->next = NULL;
264 265 266 267 268 269 270
        /* can be referenced by burst handler as well */
        while (p->_count > 1)
            refbuf_release (p);
        refbuf_release (p);
    }
    source->stream_data_tail = NULL;

Karl Heyes's avatar
Karl Heyes committed
271 272 273 274
    source->burst_point = NULL;
    source->burst_size = 0;
    source->burst_offset = 0;
    source->queue_size = 0;
275
    source->queue_size_limit = 0;
276 277
    source->listeners = 0;
    source->max_listeners = -1;
278
    source->prev_listeners = 0;
279
    source->hidden = 0;
280
    source->shoutcast_compat = 0;
281
    source->client_stats_update = 0;
282 283
    util_dict_free (source->audio_info);
    source->audio_info = NULL;
284 285 286 287 288 289

    free(source->fallback_mount);
    source->fallback_mount = NULL;

    free(source->dumpfilename);
    source->dumpfilename = NULL;
Karl Heyes's avatar
Karl Heyes committed
290 291 292 293 294 295

    if (source->intro_file)
    {
        fclose (source->intro_file);
        source->intro_file = NULL;
    }
296 297

    source->on_demand_req = 0;
298
    avl_tree_unlock (source->pending_tree);
299 300 301
}


302
/* Remove the provided source from the global tree and free it */
303
void source_free_source (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
304
{
305
    ICECAST_LOG_DEBUG("freeing source \"%s\"", source->mount);
306 307 308 309
    avl_tree_wlock (global.source_tree);
    avl_delete (global.source_tree, source, NULL);
    avl_tree_unlock (global.source_tree);

310 311
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
312

313 314 315
    /* make sure all YP entries have gone */
    yp_remove (source->mount);

316 317
    free (source->mount);
    free (source);
Jack Moffitt's avatar
Jack Moffitt committed
318

319
    return;
Jack Moffitt's avatar
Jack Moffitt committed
320
}
321

322

323 324 325
client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
326
    void *result;
327 328 329 330 331 332
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
333
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
334 335 336 337 338 339 340 341
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
342

343

344 345 346 347 348
/* Move clients from source to dest provided dest is running
 * and that the stream format is the same.
 * The only lock that should be held when this is called is the
 * source tree lock
 */
349 350
void source_move_clients (source_t *source, source_t *dest)
{
351
    unsigned long count = 0;
352 353
    if (strcmp (source->mount, dest->mount) == 0)
    {
354
        ICECAST_LOG_WARN("src and dst are the same \"%s\", skipping", source->mount);
355 356
        return;
    }
357 358 359 360
    /* we don't want the two write locks to deadlock in here */
    thread_mutex_lock (&move_clients_mutex);

    /* if the destination is not running then we can't move clients */
361

362
    avl_tree_wlock (dest->pending_tree);
363
    if (dest->running == 0 && dest->on_demand == 0)
364
    {
365
        ICECAST_LOG_WARN("destination mount %s not running, unable to move clients ", dest->mount);
366
        avl_tree_unlock (dest->pending_tree);
367
        thread_mutex_unlock (&move_clients_mutex);
368 369 370
        return;
    }

371
    do
372
    {
373
        client_t *client;
374

375 376
        /* we need to move the client and pending trees - we must take the
         * locks in this order to avoid deadlocks */
377
        avl_tree_wlock (source->pending_tree);
378
        avl_tree_wlock (source->client_tree);
379

380
        if (source->on_demand == 0 && source->format == NULL)
381
        {
382
            ICECAST_LOG_INFO("source mount %s is not available", source->mount);
383
            break;
384
        }
385
        if (source->format && dest->format)
386
        {
387 388
            if (source->format->type != dest->format->type)
            {
389
                ICECAST_LOG_WARN("stream %s and %s are of different types, ignored", source->mount, dest->mount);
390 391
                break;
            }
392
        }
393

394 395 396 397 398 399 400
        while (1)
        {
            avl_node *node = avl_get_first (source->pending_tree);
            if (node == NULL)
                break;
            client = (client_t *)(node->key);
            avl_delete (source->pending_tree, client, NULL);
401

402 403 404 405 406 407 408 409
            /* when switching a client to a different queue, be wary of the 
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
410 411
                if (source->con == NULL)
                    client->intro_offset = -1;
412 413
            }

414
            avl_insert (dest->pending_tree, (void *)client);
415
            count++;
416 417 418 419 420 421 422 423 424 425 426
        }

        while (1)
        {
            avl_node *node = avl_get_first (source->client_tree);
            if (node == NULL)
                break;

            client = (client_t *)(node->key);
            avl_delete (source->client_tree, client, NULL);

427 428 429 430 431 432 433 434
            /* when switching a client to a different queue, be wary of the 
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
435 436
                if (source->con == NULL)
                    client->intro_offset = -1;
437
            }
438
            avl_insert (dest->pending_tree, (void *)client);
439
            count++;
440
        }
441
        ICECAST_LOG_INFO("passing %lu listeners to \"%s\"", count, dest->mount);
442

443 444 445 446 447
        source->listeners = 0;
        stats_event (source->mount, "listeners", "0");

    } while (0);

448 449 450
    avl_tree_unlock (source->pending_tree);
    avl_tree_unlock (source->client_tree);

451 452 453 454
    /* see if we need to wake up an on-demand relay */
    if (dest->running == 0 && dest->on_demand && count)
        dest->on_demand_req = 1;

455 456 457 458
    avl_tree_unlock (dest->pending_tree);
    thread_mutex_unlock (&move_clients_mutex);
}

459

Karl Heyes's avatar
Karl Heyes committed
460 461 462 463 464 465 466 467 468 469 470
/* get some data from the source. The stream data is placed in a refbuf
 * and sent back, however NULL is also valid as in the case of a short
 * timeout and there's no data pending.
 */
static refbuf_t *get_next_buffer (source_t *source)
{
    refbuf_t *refbuf = NULL;
    int delay = 250;

    if (source->short_delay)
        delay = 0;
471
    while (global.running == ICECAST_RUNNING && source->running)
Karl Heyes's avatar
Karl Heyes committed
472
    {
473
        int fds = 0;
Karl Heyes's avatar
Karl Heyes committed
474 475
        time_t current = time (NULL);

476
        if (source->client)
477 478 479 480 481 482
            fds = util_timed_wait_for_fd (source->con->sock, delay);
        else
        {
            thread_sleep (delay*1000);
            source->last_read = current;
        }
Karl Heyes's avatar
Karl Heyes committed
483

484 485 486
        if (current >= source->client_stats_update)
        {
            stats_event_args (source->mount, "total_bytes_read",
487
                    "%"PRIu64, source->format->read_bytes);
488
            stats_event_args (source->mount, "total_bytes_sent",
489
                    "%"PRIu64, source->format->sent_bytes);
490 491
            source->client_stats_update = current + 5;
        }
Karl Heyes's avatar
Karl Heyes committed
492 493 494 495
        if (fds < 0)
        {
            if (! sock_recoverable (sock_error()))
            {
496
                ICECAST_LOG_WARN("Error while waiting on socket, Disconnecting source");
Karl Heyes's avatar
Karl Heyes committed
497 498 499 500 501 502
                source->running = 0;
            }
            break;
        }
        if (fds == 0)
        {
503 504
            thread_mutex_lock(&source->lock);
            if ((source->last_read + (time_t)source->timeout) < current)
Karl Heyes's avatar
Karl Heyes committed
505
            {
506
                ICECAST_LOG_DEBUG("last %ld, timeout %d, now %ld", (long)source->last_read,
507
                        source->timeout, (long)current);
508
                ICECAST_LOG_WARN("Disconnecting source due to socket timeout");
Karl Heyes's avatar
Karl Heyes committed
509 510
                source->running = 0;
            }
511
            thread_mutex_unlock(&source->lock);
Karl Heyes's avatar
Karl Heyes committed
512 513 514 515
            break;
        }
        source->last_read = current;
        refbuf = source->format->get_buffer (source);
516
        if (source->client->con && source->client->con->error)
517
        {
518
            ICECAST_LOG_INFO("End of Stream %s", source->mount);
519 520 521
            source->running = 0;
            continue;
        }
Karl Heyes's avatar
Karl Heyes committed
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
        if (refbuf)
            break;
    }

    return refbuf;
}


/* general send routine per listener.  The deletion_expected tells us whether
 * the last in the queue is about to disappear, so if this client is still
 * referring to it after writing then drop the client as it's fallen too far
 * behind 
 */ 
static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
{
    int bytes;
    int loop = 10;   /* max number of iterations in one go */
    int total_written = 0;

    while (1)
    {
543 544 545 546
        /* check for limited listener time */
        if (client->con->discon_time)
            if (time(NULL) >= client->con->discon_time)
            {
547
                ICECAST_LOG_INFO("time limit reached for client #%lu", client->con->id);
548 549 550
                client->con->error = 1;
            }

Karl Heyes's avatar
Karl Heyes committed
551 552 553 554 555 556 557 558
        /* jump out if client connection has died */
        if (client->con->error)
            break;

        /* lets not send too much to one client in one go, but don't
           sleep for too long if more data can be sent */
        if (total_written > 20000 || loop == 0)
        {
559 560
            if (client->check_buffer != format_check_file_buffer)
                source->short_delay = 1;
Karl Heyes's avatar
Karl Heyes committed
561 562 563 564 565
            break;
        }

        loop--;

Karl Heyes's avatar
Karl Heyes committed
566 567 568
        if (client->check_buffer (source, client) < 0)
            break;

569
        bytes = client->write_to_client (client);
Karl Heyes's avatar
Karl Heyes committed
570 571 572 573 574
        if (bytes <= 0)
            break;  /* can't write any more */

        total_written += bytes;
    }
575
    source->format->sent_bytes += total_written;
Karl Heyes's avatar
Karl Heyes committed
576 577 578

    /* the refbuf referenced at head (last in queue) may be marked for deletion
     * if so, check to see if this client is still referring to it */
Karl Heyes's avatar
Karl Heyes committed
579
    if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
Karl Heyes's avatar
Karl Heyes committed
580
    {
581
        ICECAST_LOG_INFO("Client %lu (%s) has fallen too far behind, removing",
582 583
                client->con->id, client->con->ip);
        stats_event_inc (source->mount, "slow_listeners");
Karl Heyes's avatar
Karl Heyes committed
584 585 586 587
        client->con->error = 1;
    }
}

Jack Moffitt's avatar
Jack Moffitt committed
588

589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
/* Open the file for stream dumping.
 * This function should do all processing of the filename.
 */
static FILE * source_open_dumpfile(const char * filename) {
#ifndef _WIN32
    /* some of the below functions seems not to be standard winapi functions */
    char buffer[PATH_MAX];
    time_t curtime;
    struct tm *loctime;

    /* Get the current time. */
    curtime = time (NULL);

    /* Convert it to local time representation. */
    loctime = localtime (&curtime);

    strftime (buffer, sizeof(buffer), filename, loctime);
    filename = buffer;
#endif

    return fopen (filename, "ab");
}

Karl Heyes's avatar
Karl Heyes committed
612 613 614
/* Perform any initialisation just before the stream data is processed, the header
 * info is processed by now and the format details are setup
 */
615
static void source_init (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
616
{
617
    ice_config_t *config = config_get_config();
618 619
    char *listenurl;
    const char *str;
620
    int listen_url_size;
621
    mount_proxy *mountinfo;
622

623
    /* 6 for max size of port */
624
    listen_url_size = strlen("http://") + strlen(config->hostname) +
625
        strlen(":") + 6 + strlen(source->mount) + 1;
626 627 628

    listenurl = malloc (listen_url_size);
    memset (listenurl, '\000', listen_url_size);
629 630
    snprintf (listenurl, listen_url_size, "http://%s:%d%s",
            config->hostname, config->port, source->mount);
631 632
    config_release_config();

633 634 635 636 637 638 639 640
    str = httpp_getvar(source->parser, "ice-audio-info");
    source->audio_info = util_dict_new();
    if (str)
    {
        _parse_audio_info (source, str);
        stats_event (source->mount, "audio_info", str);
    }

641 642
    stats_event (source->mount, "listenurl", listenurl);

Michael Smith's avatar
Michael Smith committed
643
    free(listenurl);
644

645 646
    if (source->dumpfilename != NULL)
    {
647
        source->dumpfile = source_open_dumpfile (source->dumpfilename);
648 649
        if (source->dumpfile == NULL)
        {
650
            ICECAST_LOG_WARN("Cannot open dump file \"%s\" for appending: %s, disabling.",
651 652 653 654
                    source->dumpfilename, strerror(errno));
        }
    }

655 656 657 658 659 660
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock (source->shutdown_rwlock);

    /* start off the statistics */
    source->listeners = 0;
    stats_event_inc (NULL, "source_total_connections");
661
    stats_event (source->mount, "slow_listeners", "0");
662
    stats_event_args (source->mount, "listeners", "%lu", source->listeners);
663
    stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
Karl Heyes's avatar
Karl Heyes committed
664
    stats_event_time (source->mount, "stream_start");
665
    stats_event_time_iso8601 (source->mount, "stream_start_iso8601");
666

667
    ICECAST_LOG_DEBUG("Source creation complete");
Karl Heyes's avatar
Karl Heyes committed
668
    source->last_read = time (NULL);
669
    source->prev_listeners = -1;
670
    source->running = 1;
671

672
    mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
673 674 675 676
    if (mountinfo)
    {
        if (mountinfo->on_connect)
            source_run_script (mountinfo->on_connect, source->mount);
677
        auth_stream_start (mountinfo, source->mount);
678
    }
679 680
    config_release_config();

Michael Smith's avatar
Michael Smith committed
681 682
    /*
    ** Now, if we have a fallback source and override is on, we want
683
    ** to steal its clients, because it means we've come back online
Michael Smith's avatar
Michael Smith committed
684 685 686 687
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

688 689 690 691
    if (source->fallback_override && source->fallback_mount)
    {
        source_t *fallback_source;

Michael Smith's avatar
Michael Smith committed
692 693 694
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);

695 696
        if (fallback_source)
            source_move_clients (fallback_source, source);
Michael Smith's avatar
Michael Smith committed
697

698
        avl_tree_unlock(global.source_tree);
Michael Smith's avatar
Michael Smith committed
699
    }
700 701 702 703 704
}


void source_main (source_t *source)
{
Karl Heyes's avatar
Karl Heyes committed
705
    refbuf_t *refbuf;
706 707 708 709
    client_t *client;
    avl_node *client_node;

    source_init (source);
Michael Smith's avatar
Michael Smith committed
710

711
    while (global.running == ICECAST_RUNNING && source->running) {
Karl Heyes's avatar
Karl Heyes committed
712
        int remove_from_q;
Jack Moffitt's avatar
Jack Moffitt committed
713

Karl Heyes's avatar
Karl Heyes committed
714
        refbuf = get_next_buffer (source);
715

Karl Heyes's avatar
Karl Heyes committed
716 717
        remove_from_q = 0;
        source->short_delay = 0;
718

Karl Heyes's avatar
Karl Heyes committed
719 720 721 722
        if (refbuf)
        {
            /* append buffer to the in-flight data queue,  */
            if (source->stream_data == NULL)
Michael Smith's avatar
Michael Smith committed
723
            {
Karl Heyes's avatar
Karl Heyes committed
724 725
                source->stream_data = refbuf;
                source->burst_point = refbuf;
726
            }
Karl Heyes's avatar
Karl Heyes committed
727 728 729 730 731 732 733 734 735
            if (source->stream_data_tail)
                source->stream_data_tail->next = refbuf;
            source->stream_data_tail = refbuf;
            source->queue_size += refbuf->len;
            /* new buffer is referenced for burst */
            refbuf_addref (refbuf);

            /* new data on queue, so check the burst point */
            source->burst_offset += refbuf->len;
736
            while (source->burst_offset > source->burst_size)
Karl Heyes's avatar
Karl Heyes committed
737
            {
738 739 740
                refbuf_t *to_release = source->burst_point;

                if (to_release->next)
Karl Heyes's avatar
Karl Heyes committed
741
                {
742 743 744 745
                    source->burst_point = to_release->next;
                    source->burst_offset -= to_release->len;
                    refbuf_release (to_release);
                    continue;
746
                }
747
                break;
748 749
            }

Karl Heyes's avatar
Karl Heyes committed
750 751 752
            /* save stream to file */
            if (source->dumpfile && source->format->write_buf_to_file)
                source->format->write_buf_to_file (source, refbuf);
753
        }
Karl Heyes's avatar
Karl Heyes committed
754
        /* lets see if we have too much data in the queue, but don't remove it until later */
755
        thread_mutex_lock(&source->lock);
Karl Heyes's avatar
Karl Heyes committed
756 757
        if (source->queue_size > source->queue_size_limit)
            remove_from_q = 1;
758
        thread_mutex_unlock(&source->lock);
759

760 761 762
        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

763 764 765 766 767 768
        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
Karl Heyes's avatar
Karl Heyes committed
769 770 771

            send_to_listener (source, client, remove_from_q);

772 773
            if (client->con->error) {
                client_node = avl_get_next(client_node);
774 775
                if (client->respcode == 200)
                    stats_event_dec (NULL, "listeners");
776
                avl_delete(source->client_tree, (void *)client, _free_client);
Michael Smith's avatar
Michael Smith committed
777
                source->listeners--;
778
                ICECAST_LOG_DEBUG("Client removed");
779 780 781 782 783 784 785 786
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
787

Michael Smith's avatar
Michael Smith committed
788
            if(source->max_listeners != -1 && 
789
                    source->listeners >= (unsigned long)source->max_listeners) 
Michael Smith's avatar
Michael Smith committed
790 791 792 793 794 795 796 797 798 799
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

800
                ICECAST_LOG_INFO("Client deleted, exceeding maximum listeners for this "
ePirat's avatar
ePirat committed
801
                        "mountpoint (%s).", source->mount);
Michael Smith's avatar
Michael Smith committed
802 803 804 805
                continue;
            }
            
            /* Otherwise, the client is accepted, add it */
806
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
807 808

            source->listeners++;
809
            ICECAST_LOG_DEBUG("Client added for mountpoint (%s)", source->mount);
810 811 812 813 814 815 816
            stats_event_inc(source->mount, "connections");

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
817 818 819
            avl_delete(source->pending_tree, 
                    avl_get_first(source->pending_tree)->key, 
                    source_remove_client);
820 821 822 823 824
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

Karl Heyes's avatar
Karl Heyes committed
825
        /* update the stats if need be */
826
        if (source->listeners != source->prev_listeners)
Karl Heyes's avatar
Karl Heyes committed
827
        {
828
            source->prev_listeners = source->listeners;
829
            ICECAST_LOG_INFO("listener count on %s now %lu", source->mount, source->listeners);
Karl Heyes's avatar
Karl Heyes committed
830 831 832 833 834
            if (source->listeners > source->peak_listeners)
            {
                source->peak_listeners = source->listeners;
                stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
            }
835
            stats_event_args (source->mount, "listeners", "%lu", source->listeners);
836 837
            if (source->listeners == 0 && source->on_demand)
                source->running = 0;
Karl Heyes's avatar
Karl Heyes committed
838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854
        }

        /* lets reduce the queue, any lagging clients should of been
         * terminated by now
         */
        if (source->stream_data)
        {
            /* normal unreferenced queue data will have a refcount 1, but
             * burst queue data will be at least 2, active clients will also
             * increase refcount */
            while (source->stream_data->_count == 1)
            {
                refbuf_t *to_go = source->stream_data;

                if (to_go->next == NULL || source->burst_point == to_go)
                {
                    /* this should not happen */
855
                    ICECAST_LOG_ERROR("queue state is unexpected");
Karl Heyes's avatar
Karl Heyes committed
856 857 858 859 860
                    source->running = 0;
                    break;
                }
                source->stream_data = to_go->next;
                source->queue_size -= to_go->len;
861
                to_go->next = NULL;
Karl Heyes's avatar
Karl Heyes committed
862 863 864 865
                refbuf_release (to_go);
            }
        }

866 867 868
        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Karl Heyes's avatar
Karl Heyes committed
869 870
    source_shutdown (source);
}
Jack Moffitt's avatar
Jack Moffitt committed
871

Michael Smith's avatar
Michael Smith committed
872

Karl Heyes's avatar
Karl Heyes committed
873 874
static void source_shutdown (source_t *source)
{
875 876
    mount_proxy *mountinfo;

877
    source->running = 0;
878
    ICECAST_LOG_INFO("Source from %s at \"%s\" exiting", source->con->ip, source->mount);
879

880
    mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
881 882 883 884
    if (mountinfo)
    {
        if (mountinfo->on_disconnect)
            source_run_script (mountinfo->on_disconnect, source->mount);
885
        auth_stream_end (mountinfo, source->mount);
886
    }
887 888
    config_release_config();

889 890
    /* we have de-activated the source now, so no more clients will be
     * added, now move the listeners we have to the fallback (if any)
891
     */
892 893 894
    if (source->fallback_mount)
    {
        source_t *fallback_source;
Michael Smith's avatar
Michael Smith committed
895

896 897
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (source->fallback_mount);
Michael Smith's avatar
Michael Smith committed
898

899 900 901 902 903
        if (fallback_source != NULL)
            source_move_clients (source, fallback_source);

        avl_tree_unlock (global.source_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
904

905
    /* delete this sources stats */
906
    stats_event(source->mount, NULL, NULL);
Jack Moffitt's avatar
Jack Moffitt committed
907

908 909 910 911
    /* we don't remove the source from the tree here, it may be a relay and
       therefore reserved */
    source_clear_source (source);

912 913
    global_lock();
    global.sources--;
Karl Heyes's avatar
Karl Heyes committed
914
    stats_event_args (NULL, "sources", "%d", global.sources);
915
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
916

917 918
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
919 920
}

921

Jack Moffitt's avatar
Jack Moffitt committed
922 923
static int _compare_clients(void *compare_arg, void *a, void *b)
{
924 925 926 927 928
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
929

930 931
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
932

933
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
934 935
}

936
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
937
{
938
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
939 940 941 942
}

static int _free_client(void *key)
{
943 944
    client_t *client = (client_t *)key;

945 946 947 948 949
    /* if no response has been sent then send a 404 */
    if (client->respcode == 0)
        client_send_404 (client, "Mount unavailable");
    else
        client_destroy(client);
950 951
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
952
}
953

954
static void _parse_audio_info (source_t *source, const char *s)
955
{
956
    const char *start = s;
Karl Heyes's avatar
Karl Heyes committed
957
    unsigned int len;
958 959 960 961 962 963 964 965 966 967 968 969

    while (start != NULL && *start != '\0')
    {
        if ((s = strchr (start, ';')) == NULL)
            len = strlen (start);
        else
        {
            len = (int)(s - start);
            s++; /* skip passed the ';' */
        }
        if (len)
        {
970
            char name[100], value[200];
971 972
            char *esc;

973
            sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
974 975 976 977
            esc = util_url_unescape (value);
            if (esc)
            {
                util_dict_set (source->audio_info, name, esc);
978
                stats_event (source->mount, name, esc);
979
                free (esc);
980 981
            }
        }
982
        start = s;
983 984
    }
}
985 986


987
/* Apply the mountinfo details to the source */
988
static void source_apply_mount (source_t *source, mount_proxy *mountinfo)