source.c 43.6 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
Philipp Schafft's avatar
Philipp Schafft committed
11
 * Copyright 2012-2014, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
12 13
 */

14
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
15 16 17 18
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
19 20 21 22
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
23
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
24
#include <errno.h>
25

26
/* REVIEW: Are all those includes needed? */
27
#ifndef _WIN32
28
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
29
#include <sys/time.h>
30
#include <sys/socket.h>
31
#include <sys/wait.h>
32
#include <limits.h>
33 34 35
#ifndef PATH_MAX
#define PATH_MAX 4096
#endif
36
#else
37 38
#include <winsock2.h>
#include <windows.h>
39
#define snprintf _snprintf
40
#endif
Jack Moffitt's avatar
Jack Moffitt committed
41

Marvin Scholz's avatar
Marvin Scholz committed
42 43 44 45
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/httpp/httpp.h"
#include "common/net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
46 47 48 49 50 51

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
52
#include "logging.h"
53
#include "cfgfile.h"
54
#include "util.h"
Jack Moffitt's avatar
Jack Moffitt committed
55
#include "source.h"
Michael Smith's avatar
Michael Smith committed
56
#include "format.h"
57
#include "fserve.h"
Michael Smith's avatar
Michael Smith committed
58
#include "auth.h"
59
#include "event.h"
60
#include "compat.h"
Jack Moffitt's avatar
Jack Moffitt committed
61

62 63 64
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
65 66
#define MAX_FALLBACK_DEPTH 10

67 68
mutex_t move_clients_mutex;

Jack Moffitt's avatar
Jack Moffitt committed
69 70 71
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
72
static void _parse_audio_info (source_t *source, const char *s);
Karl Heyes's avatar
Karl Heyes committed
73
static void source_shutdown (source_t *source);
Jack Moffitt's avatar
Jack Moffitt committed
74

75 76 77 78 79 80 81 82
/* Allocate a new source with the stated mountpoint, if one already
 * exists with that mountpoint in the global source tree then return
 * NULL.
 */
source_t *source_reserve (const char *mount)
{
    source_t *src = NULL;

83
    if(mount[0] != '/')
84
        ICECAST_LOG_WARN("Source at \"%s\" does not start with '/', clients will be "
85 86
                "unable to connect", mount);

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
    do
    {
        avl_tree_wlock (global.source_tree);
        src = source_find_mount_raw (mount);
        if (src)
        {
            src = NULL;
            break;
        }

        src = calloc (1, sizeof(source_t));
        if (src == NULL)
            break;

        src->client_tree = avl_tree_new(_compare_clients, NULL);
        src->pending_tree = avl_tree_new(_compare_clients, NULL);

        /* make duplicates for strings or similar */
105
        src->mount = strdup(mount);
106
        src->max_listeners = -1;
107
        thread_mutex_create(&src->lock);
108

109
        avl_insert(global.source_tree, src);
110 111 112

    } while (0);

113
    avl_tree_unlock(global.source_tree);
114 115 116 117
    return src;
}


Michael Smith's avatar
Michael Smith committed
118 119 120 121
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
122
{
123 124 125 126 127 128 129 130 131
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
132

133
    while (node) {
134
        source = (source_t *) node->key;
135
        cmp = strcmp(mount, source->mount);
136
        if (cmp < 0)
137 138 139 140 141 142
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
143

144 145
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
146 147
}

148 149

/* Search for mount, if the mount is there but not currently running then
Karl Heyes's avatar
Karl Heyes committed
150
 * check the fallback, and so on.  Must have a global source lock to call
151 152
 * this function.
 */
153
source_t *source_find_mount(const char *mount)
Michael Smith's avatar
Michael Smith committed
154
{
155
    source_t *source = NULL;
Michael Smith's avatar
Michael Smith committed
156
    ice_config_t *config;
157 158 159 160
    mount_proxy *mountinfo;
    int depth = 0;

    config = config_get_config();
161
    while (mount && depth < MAX_FALLBACK_DEPTH)
162 163
    {
        source = source_find_mount_raw(mount);
Michael Smith's avatar
Michael Smith committed
164

165 166 167 168 169
        if (source)
        {
            if (source->running || source->on_demand)
                break;
        }
Michael Smith's avatar
Michael Smith committed
170

171 172 173
        /* we either have a source which is not active (relay) or no source
         * at all. Check the mounts list for fallback settings
         */
174
        mountinfo = config_find_mount(config, mount, MOUNT_TYPE_NORMAL);
175
        source = NULL;
176 177 178 179

        if (mountinfo == NULL)
            break;
        mount = mountinfo->fallback_mount;
180
        depth++;
Michael Smith's avatar
Michael Smith committed
181 182
    }

183
    config_release_config();
Michael Smith's avatar
Michael Smith committed
184 185 186 187
    return source;
}


Jack Moffitt's avatar
Jack Moffitt committed
188 189
int source_compare_sources(void *arg, void *a, void *b)
{
190 191
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
192

193
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
194 195
}

196 197 198

void source_clear_source (source_t *source)
{
199 200
    int c;

201
    ICECAST_LOG_DEBUG("clearing source \"%s\"", source->mount);
202

203
    avl_tree_wlock (source->pending_tree);
204 205
    client_destroy(source->client);
    source->client = NULL;
206 207
    source->parser = NULL;
    source->con = NULL;
208

209 210 211 212
    /* log bytes read in access log */
    if (source->client && source->format)
        source->client->con->sent_bytes = source->format->read_bytes;

213 214
    if (source->dumpfile)
    {
215
        ICECAST_LOG_INFO("Closing dumpfile for %s", source->mount);
216 217 218 219
        fclose (source->dumpfile);
        source->dumpfile = NULL;
    }

220
    /* lets kick off any clients that are left on here */
221
    avl_tree_wlock (source->client_tree);
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
    c=0;
    while (1)
    {
        avl_node *node = avl_get_first (source->client_tree);
        if (node)
        {
            client_t *client = node->key;
            if (client->respcode == 200)
                c++; /* only count clients that have had some processing */
            avl_delete (source->client_tree, client, _free_client);
            continue;
        }
        break;
    }
    if (c)
237
    {
238
        stats_event_sub (NULL, "listeners", source->listeners);
239
        ICECAST_LOG_INFO("%d active listeners on %s released", c, source->mount);
240
    }
241
    avl_tree_unlock (source->client_tree);
242 243 244 245 246 247 248 249 250 251

    while (avl_get_first (source->pending_tree))
    {
        avl_delete (source->pending_tree,
                avl_get_first(source->pending_tree)->key, _free_client);
    }

    if (source->format && source->format->free_plugin)
        source->format->free_plugin (source->format);
    source->format = NULL;
252 253 254 255 256 257

    /* Lets clear out the source queue too */
    while (source->stream_data)
    {
        refbuf_t *p = source->stream_data;
        source->stream_data = p->next;
258
        p->next = NULL;
259 260 261 262 263 264 265
        /* can be referenced by burst handler as well */
        while (p->_count > 1)
            refbuf_release (p);
        refbuf_release (p);
    }
    source->stream_data_tail = NULL;

Karl Heyes's avatar
Karl Heyes committed
266 267 268 269
    source->burst_point = NULL;
    source->burst_size = 0;
    source->burst_offset = 0;
    source->queue_size = 0;
270
    source->queue_size_limit = 0;
271 272
    source->listeners = 0;
    source->max_listeners = -1;
273
    source->prev_listeners = 0;
274
    source->hidden = 0;
275
    source->shoutcast_compat = 0;
276
    source->client_stats_update = 0;
277
    util_dict_free(source->audio_info);
278
    source->audio_info = NULL;
279 280 281 282 283 284

    free(source->fallback_mount);
    source->fallback_mount = NULL;

    free(source->dumpfilename);
    source->dumpfilename = NULL;
Karl Heyes's avatar
Karl Heyes committed
285 286 287 288 289 290

    if (source->intro_file)
    {
        fclose (source->intro_file);
        source->intro_file = NULL;
    }
291 292

    source->on_demand_req = 0;
293
    avl_tree_unlock (source->pending_tree);
294 295 296
}


297
/* Remove the provided source from the global tree and free it */
298
void source_free_source (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
299
{
300
    ICECAST_LOG_DEBUG("freeing source \"%s\"", source->mount);
301 302 303 304
    avl_tree_wlock (global.source_tree);
    avl_delete (global.source_tree, source, NULL);
    avl_tree_unlock (global.source_tree);

305 306
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
307

308 309 310
    /* make sure all YP entries have gone */
    yp_remove (source->mount);

311 312
    free (source->mount);
    free (source);
Jack Moffitt's avatar
Jack Moffitt committed
313

314
    return;
Jack Moffitt's avatar
Jack Moffitt committed
315
}
316

317

318 319 320
client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
321
    void *result;
322 323 324 325 326 327
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
328
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
329 330 331 332 333 334 335 336
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
337

338

339 340 341 342 343
/* Move clients from source to dest provided dest is running
 * and that the stream format is the same.
 * The only lock that should be held when this is called is the
 * source tree lock
 */
344
void source_move_clients(source_t *source, source_t *dest)
345
{
346
    unsigned long count = 0;
347 348
    if (strcmp (source->mount, dest->mount) == 0)
    {
349
        ICECAST_LOG_WARN("src and dst are the same \"%s\", skipping", source->mount);
350 351
        return;
    }
352 353 354 355
    /* we don't want the two write locks to deadlock in here */
    thread_mutex_lock (&move_clients_mutex);

    /* if the destination is not running then we can't move clients */
356

357
    avl_tree_wlock (dest->pending_tree);
358
    if (dest->running == 0 && dest->on_demand == 0)
359
    {
360
        ICECAST_LOG_WARN("destination mount %s not running, unable to move clients ", dest->mount);
361
        avl_tree_unlock (dest->pending_tree);
362
        thread_mutex_unlock (&move_clients_mutex);
363 364 365
        return;
    }

366
    do
367
    {
368
        client_t *client;
369

370 371
        /* we need to move the client and pending trees - we must take the
         * locks in this order to avoid deadlocks */
372 373
        avl_tree_wlock(source->pending_tree);
        avl_tree_wlock(source->client_tree);
374

375
        if (source->on_demand == 0 && source->format == NULL)
376
        {
377
            ICECAST_LOG_INFO("source mount %s is not available", source->mount);
378
            break;
379
        }
380
        if (source->format && dest->format)
381
        {
382 383
            if (source->format->type != dest->format->type)
            {
384
                ICECAST_LOG_WARN("stream %s and %s are of different types, ignored", source->mount, dest->mount);
385 386
                break;
            }
387
        }
388

389 390 391 392 393 394 395
        while (1)
        {
            avl_node *node = avl_get_first (source->pending_tree);
            if (node == NULL)
                break;
            client = (client_t *)(node->key);
            avl_delete (source->pending_tree, client, NULL);
396

397
            /* when switching a client to a different queue, be wary of the
398 399 400 401 402 403 404
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
405 406
                if (source->con == NULL)
                    client->intro_offset = -1;
407 408
            }

409
            avl_insert (dest->pending_tree, (void *)client);
410
            count++;
411 412 413 414 415 416 417 418 419 420 421
        }

        while (1)
        {
            avl_node *node = avl_get_first (source->client_tree);
            if (node == NULL)
                break;

            client = (client_t *)(node->key);
            avl_delete (source->client_tree, client, NULL);

422
            /* when switching a client to a different queue, be wary of the
423 424 425 426 427 428 429
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
430 431
                if (source->con == NULL)
                    client->intro_offset = -1;
432
            }
433
            avl_insert (dest->pending_tree, (void *)client);
434
            count++;
435
        }
436
        ICECAST_LOG_INFO("passing %lu listeners to \"%s\"", count, dest->mount);
437

438 439 440 441 442
        source->listeners = 0;
        stats_event (source->mount, "listeners", "0");

    } while (0);

443 444 445
    avl_tree_unlock (source->pending_tree);
    avl_tree_unlock (source->client_tree);

446 447 448 449
    /* see if we need to wake up an on-demand relay */
    if (dest->running == 0 && dest->on_demand && count)
        dest->on_demand_req = 1;

450 451 452 453
    avl_tree_unlock (dest->pending_tree);
    thread_mutex_unlock (&move_clients_mutex);
}

454

Karl Heyes's avatar
Karl Heyes committed
455 456 457 458 459 460 461 462 463 464 465
/* get some data from the source. The stream data is placed in a refbuf
 * and sent back, however NULL is also valid as in the case of a short
 * timeout and there's no data pending.
 */
static refbuf_t *get_next_buffer (source_t *source)
{
    refbuf_t *refbuf = NULL;
    int delay = 250;

    if (source->short_delay)
        delay = 0;
466
    while (global.running == ICECAST_RUNNING && source->running)
Karl Heyes's avatar
Karl Heyes committed
467
    {
468
        int fds = 0;
Karl Heyes's avatar
Karl Heyes committed
469 470
        time_t current = time (NULL);

471
        if (source->client)
472 473 474 475 476 477
            fds = util_timed_wait_for_fd (source->con->sock, delay);
        else
        {
            thread_sleep (delay*1000);
            source->last_read = current;
        }
Karl Heyes's avatar
Karl Heyes committed
478

479 480 481
        if (current >= source->client_stats_update)
        {
            stats_event_args (source->mount, "total_bytes_read",
482
                    "%"PRIu64, source->format->read_bytes);
483
            stats_event_args (source->mount, "total_bytes_sent",
484
                    "%"PRIu64, source->format->sent_bytes);
485 486
            source->client_stats_update = current + 5;
        }
Karl Heyes's avatar
Karl Heyes committed
487 488 489 490
        if (fds < 0)
        {
            if (! sock_recoverable (sock_error()))
            {
491
                ICECAST_LOG_WARN("Error while waiting on socket, Disconnecting source");
Karl Heyes's avatar
Karl Heyes committed
492 493 494 495 496 497
                source->running = 0;
            }
            break;
        }
        if (fds == 0)
        {
498 499
            thread_mutex_lock(&source->lock);
            if ((source->last_read + (time_t)source->timeout) < current)
Karl Heyes's avatar
Karl Heyes committed
500
            {
501
                ICECAST_LOG_DEBUG("last %ld, timeout %d, now %ld", (long)source->last_read,
502
                        source->timeout, (long)current);
503
                ICECAST_LOG_WARN("Disconnecting source due to socket timeout");
Karl Heyes's avatar
Karl Heyes committed
504 505
                source->running = 0;
            }
506
            thread_mutex_unlock(&source->lock);
Karl Heyes's avatar
Karl Heyes committed
507 508 509 510
            break;
        }
        source->last_read = current;
        refbuf = source->format->get_buffer (source);
511 512 513 514
#ifdef HAVE_OPENSSL
       if (source->client->con->ssl && (SSL_get_shutdown(source->client->con->ssl) & SSL_RECEIVED_SHUTDOWN))
            source->client->con->error = 1;
#endif
515
        if (source->client->con && source->client->con->error)
516
        {
517
            ICECAST_LOG_INFO("End of Stream %s", source->mount);
518 519 520
            source->running = 0;
            continue;
        }
Karl Heyes's avatar
Karl Heyes committed
521 522 523 524 525 526 527 528 529 530 531
        if (refbuf)
            break;
    }

    return refbuf;
}


/* general send routine per listener.  The deletion_expected tells us whether
 * the last in the queue is about to disappear, so if this client is still
 * referring to it after writing then drop the client as it's fallen too far
532 533
 * behind
 */
Karl Heyes's avatar
Karl Heyes committed
534 535 536 537 538 539 540 541
static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
{
    int bytes;
    int loop = 10;   /* max number of iterations in one go */
    int total_written = 0;

    while (1)
    {
542 543 544 545
        /* check for limited listener time */
        if (client->con->discon_time)
            if (time(NULL) >= client->con->discon_time)
            {
546
                ICECAST_LOG_INFO("time limit reached for client #%lu", client->con->id);
547 548 549
                client->con->error = 1;
            }

Karl Heyes's avatar
Karl Heyes committed
550 551 552 553 554 555 556 557
        /* jump out if client connection has died */
        if (client->con->error)
            break;

        /* lets not send too much to one client in one go, but don't
           sleep for too long if more data can be sent */
        if (total_written > 20000 || loop == 0)
        {
558 559
            if (client->check_buffer != format_check_file_buffer)
                source->short_delay = 1;
Karl Heyes's avatar
Karl Heyes committed
560 561 562 563 564
            break;
        }

        loop--;

565
        if (client->check_buffer(source, client) < 0)
Karl Heyes's avatar
Karl Heyes committed
566 567
            break;

568
        bytes = client->write_to_client(client);
Karl Heyes's avatar
Karl Heyes committed
569
        if (bytes <= 0)
570
            break; /* can't write any more */
Karl Heyes's avatar
Karl Heyes committed
571 572 573

        total_written += bytes;
    }
574
    source->format->sent_bytes += total_written;
Karl Heyes's avatar
Karl Heyes committed
575 576 577

    /* the refbuf referenced at head (last in queue) may be marked for deletion
     * if so, check to see if this client is still referring to it */
Karl Heyes's avatar
Karl Heyes committed
578
    if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
Karl Heyes's avatar
Karl Heyes committed
579
    {
580
        ICECAST_LOG_INFO("Client %lu (%s) has fallen too far behind, removing",
581 582
                client->con->id, client->con->ip);
        stats_event_inc (source->mount, "slow_listeners");
Karl Heyes's avatar
Karl Heyes committed
583 584 585 586
        client->con->error = 1;
    }
}

Jack Moffitt's avatar
Jack Moffitt committed
587

588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
/* Open the file for stream dumping.
 * This function should do all processing of the filename.
 */
static FILE * source_open_dumpfile(const char * filename) {
#ifndef _WIN32
    /* some of the below functions seems not to be standard winapi functions */
    char buffer[PATH_MAX];
    time_t curtime;
    struct tm *loctime;

    /* Get the current time. */
    curtime = time (NULL);

    /* Convert it to local time representation. */
    loctime = localtime (&curtime);

    strftime (buffer, sizeof(buffer), filename, loctime);
    filename = buffer;
#endif

    return fopen (filename, "ab");
}

Karl Heyes's avatar
Karl Heyes committed
611 612 613
/* Perform any initialisation just before the stream data is processed, the header
 * info is processed by now and the format details are setup
 */
614
static void source_init (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
615
{
616
    ice_config_t *config = config_get_config();
617 618
    char *listenurl;
    const char *str;
619 620
    int listen_url_size;

621
    /* 6 for max size of port */
622
    listen_url_size = strlen("http://") + strlen(config->hostname) +
623
        strlen(":") + 6 + strlen(source->mount) + 1;
624 625 626

    listenurl = malloc (listen_url_size);
    memset (listenurl, '\000', listen_url_size);
627 628
    snprintf (listenurl, listen_url_size, "http://%s:%d%s",
            config->hostname, config->port, source->mount);
629 630
    config_release_config();

631 632 633 634 635 636 637 638
    str = httpp_getvar(source->parser, "ice-audio-info");
    source->audio_info = util_dict_new();
    if (str)
    {
        _parse_audio_info (source, str);
        stats_event (source->mount, "audio_info", str);
    }

639 640
    stats_event (source->mount, "listenurl", listenurl);

Michael Smith's avatar
Michael Smith committed
641
    free(listenurl);
642

643 644
    if (source->dumpfilename != NULL)
    {
645
        source->dumpfile = source_open_dumpfile (source->dumpfilename);
646 647
        if (source->dumpfile == NULL)
        {
648
            ICECAST_LOG_WARN("Cannot open dump file \"%s\" for appending: %s, disabling.",
649 650 651 652
                    source->dumpfilename, strerror(errno));
        }
    }

653 654 655 656 657 658
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock (source->shutdown_rwlock);

    /* start off the statistics */
    source->listeners = 0;
    stats_event_inc (NULL, "source_total_connections");
659
    stats_event (source->mount, "slow_listeners", "0");
660
    stats_event_args (source->mount, "listeners", "%lu", source->listeners);
661
    stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
Karl Heyes's avatar
Karl Heyes committed
662
    stats_event_time (source->mount, "stream_start");
663
    stats_event_time_iso8601 (source->mount, "stream_start_iso8601");
664

665
    ICECAST_LOG_DEBUG("Source creation complete");
Karl Heyes's avatar
Karl Heyes committed
666
    source->last_read = time (NULL);
667
    source->prev_listeners = -1;
668
    source->running = 1;
669

670
    event_emit_clientevent("source-connect", source->client, source->mount);
671

Michael Smith's avatar
Michael Smith committed
672 673
    /*
    ** Now, if we have a fallback source and override is on, we want
674
    ** to steal its clients, because it means we've come back online
Michael Smith's avatar
Michael Smith committed
675 676 677 678
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

679 680 681 682
    if (source->fallback_override && source->fallback_mount)
    {
        source_t *fallback_source;

Michael Smith's avatar
Michael Smith committed
683 684 685
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);

686 687
        if (fallback_source)
            source_move_clients (fallback_source, source);
Michael Smith's avatar
Michael Smith committed
688

689
        avl_tree_unlock(global.source_tree);
Michael Smith's avatar
Michael Smith committed
690
    }
691 692 693 694 695
}


void source_main (source_t *source)
{
Karl Heyes's avatar
Karl Heyes committed
696
    refbuf_t *refbuf;
697 698 699 700
    client_t *client;
    avl_node *client_node;

    source_init (source);
Michael Smith's avatar
Michael Smith committed
701

702
    while (global.running == ICECAST_RUNNING && source->running) {
Karl Heyes's avatar
Karl Heyes committed
703
        int remove_from_q;
Jack Moffitt's avatar
Jack Moffitt committed
704

Karl Heyes's avatar
Karl Heyes committed
705
        refbuf = get_next_buffer (source);
706

Karl Heyes's avatar
Karl Heyes committed
707 708
        remove_from_q = 0;
        source->short_delay = 0;
709

Karl Heyes's avatar
Karl Heyes committed
710 711 712 713
        if (refbuf)
        {
            /* append buffer to the in-flight data queue,  */
            if (source->stream_data == NULL)
Michael Smith's avatar
Michael Smith committed
714
            {
Karl Heyes's avatar
Karl Heyes committed
715 716
                source->stream_data = refbuf;
                source->burst_point = refbuf;
717
            }
Karl Heyes's avatar
Karl Heyes committed
718 719 720 721 722
            if (source->stream_data_tail)
                source->stream_data_tail->next = refbuf;
            source->stream_data_tail = refbuf;
            source->queue_size += refbuf->len;
            /* new buffer is referenced for burst */
723
            refbuf_addref(refbuf);
Karl Heyes's avatar
Karl Heyes committed
724 725 726

            /* new data on queue, so check the burst point */
            source->burst_offset += refbuf->len;
727
            while (source->burst_offset > source->burst_size)
Karl Heyes's avatar
Karl Heyes committed
728
            {
729 730 731
                refbuf_t *to_release = source->burst_point;

                if (to_release->next)
Karl Heyes's avatar
Karl Heyes committed
732
                {
733 734
                    source->burst_point = to_release->next;
                    source->burst_offset -= to_release->len;
735
                    refbuf_release(to_release);
736
                    continue;
737
                }
738
                break;
739 740
            }

Karl Heyes's avatar
Karl Heyes committed
741 742
            /* save stream to file */
            if (source->dumpfile && source->format->write_buf_to_file)
743
                source->format->write_buf_to_file(source, refbuf);
744
        }
Karl Heyes's avatar
Karl Heyes committed
745
        /* lets see if we have too much data in the queue, but don't remove it until later */
746
        thread_mutex_lock(&source->lock);
Karl Heyes's avatar
Karl Heyes committed
747 748
        if (source->queue_size > source->queue_size_limit)
            remove_from_q = 1;
749
        thread_mutex_unlock(&source->lock);
750

751 752 753
        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

754 755 756 757 758
        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        client_node = avl_get_first(source->client_tree);
        while (client_node) {
759
            client = (client_t *) client_node->key;
Karl Heyes's avatar
Karl Heyes committed
760

761
            send_to_listener(source, client, remove_from_q);
Karl Heyes's avatar
Karl Heyes committed
762

763 764
            if (client->con->error) {
                client_node = avl_get_next(client_node);
765
                if (client->respcode == 200)
766 767
                    stats_event_dec(NULL, "listeners");
                avl_delete(source->client_tree, (void *) client, _free_client);
Michael Smith's avatar
Michael Smith committed
768
                source->listeners--;
769
                ICECAST_LOG_DEBUG("Client removed");
770 771 772 773 774 775 776 777
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
778

779 780
            if(source->max_listeners != -1 &&
                    source->listeners >= (unsigned long)source->max_listeners)
Michael Smith's avatar
Michael Smith committed
781 782 783 784 785 786 787 788 789 790
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

791
                ICECAST_LOG_INFO("Client deleted, exceeding maximum listeners for this "
ePirat's avatar
ePirat committed
792
                        "mountpoint (%s).", source->mount);
Michael Smith's avatar
Michael Smith committed
793 794
                continue;
            }
795

Michael Smith's avatar
Michael Smith committed
796
            /* Otherwise, the client is accepted, add it */
797
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
798 799

            source->listeners++;
800
            ICECAST_LOG_DEBUG("Client added for mountpoint (%s)", source->mount);
801 802 803 804 805 806 807
            stats_event_inc(source->mount, "connections");

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
808 809
            avl_delete(source->pending_tree,
                    avl_get_first(source->pending_tree)->key,
Michael Smith's avatar
Michael Smith committed
810
                    source_remove_client);
811 812 813 814 815
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

Karl Heyes's avatar
Karl Heyes committed
816
        /* update the stats if need be */
817
        if (source->listeners != source->prev_listeners)
Karl Heyes's avatar
Karl Heyes committed
818
        {
819
            source->prev_listeners = source->listeners;
820
            ICECAST_LOG_INFO("listener count on %s now %lu", source->mount, source->listeners);
Karl Heyes's avatar
Karl Heyes committed
821 822 823 824 825
            if (source->listeners > source->peak_listeners)
            {
                source->peak_listeners = source->listeners;
                stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
            }
826
            stats_event_args (source->mount, "listeners", "%lu", source->listeners);
827 828
            if (source->listeners == 0 && source->on_demand)
                source->running = 0;
Karl Heyes's avatar
Karl Heyes committed
829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
        }

        /* lets reduce the queue, any lagging clients should of been
         * terminated by now
         */
        if (source->stream_data)
        {
            /* normal unreferenced queue data will have a refcount 1, but
             * burst queue data will be at least 2, active clients will also
             * increase refcount */
            while (source->stream_data->_count == 1)
            {
                refbuf_t *to_go = source->stream_data;

                if (to_go->next == NULL || source->burst_point == to_go)
                {
                    /* this should not happen */
846
                    ICECAST_LOG_ERROR("queue state is unexpected");
Karl Heyes's avatar
Karl Heyes committed
847 848 849 850 851
                    source->running = 0;
                    break;
                }
                source->stream_data = to_go->next;
                source->queue_size -= to_go->len;
852
                to_go->next = NULL;
Karl Heyes's avatar
Karl Heyes committed
853 854 855 856
                refbuf_release (to_go);
            }
        }

857 858 859
        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Karl Heyes's avatar
Karl Heyes committed
860 861
    source_shutdown (source);
}
Jack Moffitt's avatar
Jack Moffitt committed
862

Michael Smith's avatar
Michael Smith committed
863

Karl Heyes's avatar
Karl Heyes committed
864 865
static void source_shutdown (source_t *source)
{
866
    source->running = 0;
867
    ICECAST_LOG_INFO("Source from %s at \"%s\" exiting", source->con->ip, source->mount);
868

869
    event_emit_clientevent("source-disconnect", source->client, source->mount);
870

871 872
    /* we have de-activated the source now, so no more clients will be
     * added, now move the listeners we have to the fallback (if any)
873
     */
874 875 876
    if (source->fallback_mount)
    {
        source_t *fallback_source;
Michael Smith's avatar
Michael Smith committed
877

878
        avl_tree_rlock(global.source_tree);
879
        fallback_source = source_find_mount(source->fallback_mount);
Michael Smith's avatar
Michael Smith committed
880

881
        if (fallback_source != NULL)
882
            source_move_clients(source, fallback_source);
883

884
        avl_tree_unlock(global.source_tree);
885
    }
Jack Moffitt's avatar
Jack Moffitt committed
886

887
    /* delete this sources stats */
888
    stats_event(source->mount, NULL, NULL);
Jack Moffitt's avatar
Jack Moffitt committed
889

890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914
    if (source->client && source->parser) {
        /* For PUT support we check for 100-continue and send back a final 200. */
        const char *expectcontinue = httpp_getvar(source->parser, "expect");

        if (expectcontinue != NULL) {
#ifdef HAVE_STRCASESTR
            if (strcasestr (expectcontinue, "100-continue") != NULL)
#else
            ICECAST_LOG_WARN("OS doesn't support case insenestive substring checks...");
            if (strstr (expectcontinue, "100-continue") != NULL)
#endif
            {
                client_t *client = source->client;
                source->client = NULL; /* detach client from source. */

                util_http_build_header(client->refbuf->data, PER_CLIENT_REFBUF_SIZE, 0, 0, 200, NULL, NULL, NULL, "", NULL, source->client);
                client->refbuf->len = strlen(client->refbuf->data);
                refbuf_release(client->refbuf->next);
                client->refbuf->next = NULL;
                client->pos = 0;
                fserve_add_client(client, NULL);
            }
        }
    }

915
    /* we don't remove the source from the tree here, it may be a relay and
916 917
     therefore reserved */
    source_clear_source(source);
918

919 920
    global_lock();
    global.sources--;
921
    stats_event_args(NULL, "sources", "%d", global.sources);
922
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
923

924 925
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
926 927
}

928

Jack Moffitt's avatar
Jack Moffitt committed
929 930
static int _compare_clients(void *compare_arg, void *a, void *b)
{
931 932
    client_t *clienta = (client_t *) a;
    client_t *clientb = (client_t *) b;
933 934 935

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
936

937 938
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
939

940
    return 0;