source.c 43.6 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
11
 * Copyright 2012-2018, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
12 13
 */

14
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
15 16 17 18
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
19 20 21 22
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
23
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
24
#include <errno.h>
25

26
/* REVIEW: Are all those includes needed? */
27
#ifndef _WIN32
28
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
29
#include <sys/time.h>
30
#include <sys/socket.h>
31
#include <sys/wait.h>
32
#include <limits.h>
33 34 35
#ifndef PATH_MAX
#define PATH_MAX 4096
#endif
36
#else
37 38
#include <winsock2.h>
#include <windows.h>
39
#define snprintf _snprintf
40
#endif
Jack Moffitt's avatar
Jack Moffitt committed
41

42 43 44
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/httpp/httpp.h"
Jack Moffitt's avatar
Jack Moffitt committed
45

46 47
#include "source.h"
#include "compat.h"
Jack Moffitt's avatar
Jack Moffitt committed
48 49 50 51
#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
52
#include "errors.h"
Jack Moffitt's avatar
Jack Moffitt committed
53
#include "stats.h"
54
#include "logging.h"
55
#include "cfgfile.h"
56
#include "util.h"
57
#include "format.h"
58
#include "fserve.h"
59
#include "auth.h"
60
#include "event.h"
61 62
#include "slave.h"
#include "acl.h"
Jack Moffitt's avatar
Jack Moffitt committed
63

64 65 66
#undef CATMODULE
#define CATMODULE "source"

67 68
#define MAX_FALLBACK_DEPTH 10

69 70
mutex_t move_clients_mutex;

Jack Moffitt's avatar
Jack Moffitt committed
71 72 73
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
74
static void _parse_audio_info (source_t *source, const char *s);
75
static void source_shutdown (source_t *source);
Jack Moffitt's avatar
Jack Moffitt committed
76

77 78 79 80 81 82 83 84
/* Allocate a new source with the stated mountpoint, if one already
 * exists with that mountpoint in the global source tree then return
 * NULL.
 */
source_t *source_reserve (const char *mount)
{
    source_t *src = NULL;

85
    if(mount[0] != '/')
86
        ICECAST_LOG_WARN("Source at \"%s\" does not start with '/', clients will be "
87 88
                "unable to connect", mount);

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
    do
    {
        avl_tree_wlock (global.source_tree);
        src = source_find_mount_raw (mount);
        if (src)
        {
            src = NULL;
            break;
        }

        src = calloc (1, sizeof(source_t));
        if (src == NULL)
            break;

        src->client_tree = avl_tree_new(_compare_clients, NULL);
        src->pending_tree = avl_tree_new(_compare_clients, NULL);
105
        src->history = playlist_new(10 /* DOCUMENT: default is max_tracks=10. */);
106 107

        /* make duplicates for strings or similar */
108
        src->mount = strdup(mount);
109
        src->max_listeners = -1;
110
        thread_mutex_create(&src->lock);
111

112
        avl_insert(global.source_tree, src);
113 114 115

    } while (0);

116
    avl_tree_unlock(global.source_tree);
117 118 119 120
    return src;
}


121 122 123 124
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
125
{
126 127 128 129 130 131 132 133 134
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
135

136
    while (node) {
137
        source = (source_t *) node->key;
138
        cmp = strcmp(mount, source->mount);
139
        if (cmp < 0)
140 141 142 143 144 145
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
146

147 148
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
149 150
}

151 152

/* Search for mount, if the mount is there but not currently running then
Karl Heyes's avatar
Karl Heyes committed
153
 * check the fallback, and so on.  Must have a global source lock to call
154 155
 * this function.
 */
156
source_t *source_find_mount(const char *mount)
157
{
158
    source_t *source = NULL;
159
    ice_config_t *config;
160 161 162 163
    mount_proxy *mountinfo;
    int depth = 0;

    config = config_get_config();
164
    while (mount && depth < MAX_FALLBACK_DEPTH)
165 166
    {
        source = source_find_mount_raw(mount);
167

168 169 170 171 172
        if (source)
        {
            if (source->running || source->on_demand)
                break;
        }
173

174 175 176
        /* we either have a source which is not active (relay) or no source
         * at all. Check the mounts list for fallback settings
         */
177
        mountinfo = config_find_mount(config, mount, MOUNT_TYPE_NORMAL);
178
        source = NULL;
179 180 181 182

        if (mountinfo == NULL)
            break;
        mount = mountinfo->fallback_mount;
183
        depth++;
184 185
    }

186
    config_release_config();
187 188 189 190
    return source;
}


Jack Moffitt's avatar
Jack Moffitt committed
191 192
int source_compare_sources(void *arg, void *a, void *b)
{
193 194
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
195

196 197
    (void)arg;

198
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
199 200
}

201 202 203

void source_clear_source (source_t *source)
{
204 205
    int c;

206
    ICECAST_LOG_DEBUG("clearing source \"%s\"", source->mount);
207

208
    avl_tree_wlock (source->pending_tree);
209 210
    client_destroy(source->client);
    source->client = NULL;
211 212
    source->parser = NULL;
    source->con = NULL;
213

214 215 216 217
    /* log bytes read in access log */
    if (source->client && source->format)
        source->client->con->sent_bytes = source->format->read_bytes;

218 219
    if (source->dumpfile)
    {
220
        ICECAST_LOG_INFO("Closing dumpfile for %s", source->mount);
221 222 223 224
        fclose (source->dumpfile);
        source->dumpfile = NULL;
    }

225
    /* lets kick off any clients that are left on here */
226
    avl_tree_wlock (source->client_tree);
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
    c=0;
    while (1)
    {
        avl_node *node = avl_get_first (source->client_tree);
        if (node)
        {
            client_t *client = node->key;
            if (client->respcode == 200)
                c++; /* only count clients that have had some processing */
            avl_delete (source->client_tree, client, _free_client);
            continue;
        }
        break;
    }
    if (c)
242
    {
243
        stats_event_sub (NULL, "listeners", source->listeners);
244
        ICECAST_LOG_INFO("%d active listeners on %s released", c, source->mount);
245
    }
246
    avl_tree_unlock (source->client_tree);
247 248 249 250 251 252 253 254 255 256

    while (avl_get_first (source->pending_tree))
    {
        avl_delete (source->pending_tree,
                avl_get_first(source->pending_tree)->key, _free_client);
    }

    if (source->format && source->format->free_plugin)
        source->format->free_plugin (source->format);
    source->format = NULL;
257 258 259 260 261 262

    /* Lets clear out the source queue too */
    while (source->stream_data)
    {
        refbuf_t *p = source->stream_data;
        source->stream_data = p->next;
263
        p->next = NULL;
264 265 266 267 268 269 270
        /* can be referenced by burst handler as well */
        while (p->_count > 1)
            refbuf_release (p);
        refbuf_release (p);
    }
    source->stream_data_tail = NULL;

271 272 273 274
    source->burst_point = NULL;
    source->burst_size = 0;
    source->burst_offset = 0;
    source->queue_size = 0;
275
    source->queue_size_limit = 0;
276 277
    source->listeners = 0;
    source->max_listeners = -1;
278
    source->prev_listeners = 0;
279
    source->hidden = 0;
280
    source->shoutcast_compat = 0;
281
    source->client_stats_update = 0;
282
    util_dict_free(source->audio_info);
283
    source->audio_info = NULL;
284 285 286 287 288 289

    free(source->fallback_mount);
    source->fallback_mount = NULL;

    free(source->dumpfilename);
    source->dumpfilename = NULL;
290

291 292 293
    playlist_release(source->history);
    source->history = NULL;

294 295 296 297 298
    if (source->intro_file)
    {
        fclose (source->intro_file);
        source->intro_file = NULL;
    }
299 300

    source->on_demand_req = 0;
301
    avl_tree_unlock (source->pending_tree);
302 303 304
}


305
/* Remove the provided source from the global tree and free it */
306
void source_free_source (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
307
{
308
    ICECAST_LOG_DEBUG("freeing source \"%s\"", source->mount);
309 310 311 312
    avl_tree_wlock (global.source_tree);
    avl_delete (global.source_tree, source, NULL);
    avl_tree_unlock (global.source_tree);

313 314
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
315

316 317 318
    /* make sure all YP entries have gone */
    yp_remove (source->mount);

319 320
    free (source->mount);
    free (source);
Jack Moffitt's avatar
Jack Moffitt committed
321

322
    return;
Jack Moffitt's avatar
Jack Moffitt committed
323
}
324

325

326 327 328
client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
329
    void *result;
330 331 332 333 334 335
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
336
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
337 338 339 340 341 342 343 344
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
345

346

347 348 349 350 351
/* Move clients from source to dest provided dest is running
 * and that the stream format is the same.
 * The only lock that should be held when this is called is the
 * source tree lock
 */
352
void source_move_clients(source_t *source, source_t *dest)
353
{
354
    unsigned long count = 0;
355 356
    if (strcmp (source->mount, dest->mount) == 0)
    {
357
        ICECAST_LOG_WARN("src and dst are the same \"%s\", skipping", source->mount);
358 359
        return;
    }
360 361 362 363
    /* we don't want the two write locks to deadlock in here */
    thread_mutex_lock (&move_clients_mutex);

    /* if the destination is not running then we can't move clients */
364

365
    avl_tree_wlock (dest->pending_tree);
366
    if (dest->running == 0 && dest->on_demand == 0)
367
    {
368
        ICECAST_LOG_WARN("destination mount %s not running, unable to move clients ", dest->mount);
369
        avl_tree_unlock (dest->pending_tree);
370
        thread_mutex_unlock (&move_clients_mutex);
371 372 373
        return;
    }

374
    do
375
    {
376
        client_t *client;
377

378 379
        /* we need to move the client and pending trees - we must take the
         * locks in this order to avoid deadlocks */
380 381
        avl_tree_wlock(source->pending_tree);
        avl_tree_wlock(source->client_tree);
382

383
        if (source->on_demand == 0 && source->format == NULL)
384
        {
385
            ICECAST_LOG_INFO("source mount %s is not available", source->mount);
386
            break;
387
        }
388
        if (source->format && dest->format)
389
        {
390 391
            if (source->format->type != dest->format->type)
            {
392
                ICECAST_LOG_WARN("stream %s and %s are of different types, ignored", source->mount, dest->mount);
393 394
                break;
            }
395
        }
396

397 398 399 400 401 402 403
        while (1)
        {
            avl_node *node = avl_get_first (source->pending_tree);
            if (node == NULL)
                break;
            client = (client_t *)(node->key);
            avl_delete (source->pending_tree, client, NULL);
404

405
            /* when switching a client to a different queue, be wary of the
406 407 408 409 410 411 412
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
413 414
                if (source->con == NULL)
                    client->intro_offset = -1;
415 416
            }

417
            avl_insert (dest->pending_tree, (void *)client);
418
            count++;
419 420 421 422 423 424 425 426 427 428 429
        }

        while (1)
        {
            avl_node *node = avl_get_first (source->client_tree);
            if (node == NULL)
                break;

            client = (client_t *)(node->key);
            avl_delete (source->client_tree, client, NULL);

430
            /* when switching a client to a different queue, be wary of the
431 432 433 434 435 436 437
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
438 439
                if (source->con == NULL)
                    client->intro_offset = -1;
440
            }
441
            avl_insert (dest->pending_tree, (void *)client);
442
            count++;
443
        }
444
        ICECAST_LOG_INFO("passing %lu listeners to \"%s\"", count, dest->mount);
445

446 447 448 449 450
        source->listeners = 0;
        stats_event (source->mount, "listeners", "0");

    } while (0);

451 452 453
    avl_tree_unlock (source->pending_tree);
    avl_tree_unlock (source->client_tree);

454 455 456 457
    /* see if we need to wake up an on-demand relay */
    if (dest->running == 0 && dest->on_demand && count)
        dest->on_demand_req = 1;

458 459 460 461
    avl_tree_unlock (dest->pending_tree);
    thread_mutex_unlock (&move_clients_mutex);
}

462

463 464 465 466 467 468 469 470 471 472 473
/* get some data from the source. The stream data is placed in a refbuf
 * and sent back, however NULL is also valid as in the case of a short
 * timeout and there's no data pending.
 */
static refbuf_t *get_next_buffer (source_t *source)
{
    refbuf_t *refbuf = NULL;
    int delay = 250;

    if (source->short_delay)
        delay = 0;
474
    while (global.running == ICECAST_RUNNING && source->running)
475
    {
476
        int fds = 0;
477 478
        time_t current = time (NULL);

479
        if (source->client)
480 481 482 483 484 485
            fds = util_timed_wait_for_fd (source->con->sock, delay);
        else
        {
            thread_sleep (delay*1000);
            source->last_read = current;
        }
486

487 488 489
        if (current >= source->client_stats_update)
        {
            stats_event_args (source->mount, "total_bytes_read",
490
                    "%"PRIu64, source->format->read_bytes);
491
            stats_event_args (source->mount, "total_bytes_sent",
492
                    "%"PRIu64, source->format->sent_bytes);
493 494
            source->client_stats_update = current + 5;
        }
495 496 497 498
        if (fds < 0)
        {
            if (! sock_recoverable (sock_error()))
            {
499
                ICECAST_LOG_WARN("Error while waiting on socket, Disconnecting source");
500 501 502 503 504 505
                source->running = 0;
            }
            break;
        }
        if (fds == 0)
        {
506 507
            thread_mutex_lock(&source->lock);
            if ((source->last_read + (time_t)source->timeout) < current)
508
            {
509
                ICECAST_LOG_DEBUG("last %ld, timeout %d, now %ld", (long)source->last_read,
510
                        source->timeout, (long)current);
511
                ICECAST_LOG_WARN("Disconnecting source due to socket timeout");
512 513
                source->running = 0;
            }
514
            thread_mutex_unlock(&source->lock);
515 516 517 518
            break;
        }
        source->last_read = current;
        refbuf = source->format->get_buffer (source);
519
        if (client_body_eof(source->client)) {
520
            ICECAST_LOG_INFO("End of Stream %s", source->mount);
521 522 523
            source->running = 0;
            continue;
        }
524 525 526 527 528 529 530 531 532 533 534
        if (refbuf)
            break;
    }

    return refbuf;
}


/* general send routine per listener.  The deletion_expected tells us whether
 * the last in the queue is about to disappear, so if this client is still
 * referring to it after writing then drop the client as it's fallen too far
535 536
 * behind
 */
537 538 539 540 541 542 543 544
static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
{
    int bytes;
    int loop = 10;   /* max number of iterations in one go */
    int total_written = 0;

    while (1)
    {
545 546 547 548
        /* check for limited listener time */
        if (client->con->discon_time)
            if (time(NULL) >= client->con->discon_time)
            {
549
                ICECAST_LOG_INFO("time limit reached for client #%lu", client->con->id);
550 551 552
                client->con->error = 1;
            }

553 554 555 556 557 558 559 560
        /* jump out if client connection has died */
        if (client->con->error)
            break;

        /* lets not send too much to one client in one go, but don't
           sleep for too long if more data can be sent */
        if (total_written > 20000 || loop == 0)
        {
561 562
            if (client->check_buffer != format_check_file_buffer)
                source->short_delay = 1;
563 564 565 566 567
            break;
        }

        loop--;

568
        if (client->check_buffer(source, client) < 0)
569 570
            break;

571
        bytes = client->write_to_client(client);
572
        if (bytes <= 0)
573
            break; /* can't write any more */
574 575 576

        total_written += bytes;
    }
577
    source->format->sent_bytes += total_written;
578 579 580

    /* the refbuf referenced at head (last in queue) may be marked for deletion
     * if so, check to see if this client is still referring to it */
581
    if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
582
    {
583
        ICECAST_LOG_INFO("Client %lu (%s) has fallen too far behind, removing",
584 585
                client->con->id, client->con->ip);
        stats_event_inc (source->mount, "slow_listeners");
586 587 588 589
        client->con->error = 1;
    }
}

Jack Moffitt's avatar
Jack Moffitt committed
590

591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
/* Open the file for stream dumping.
 * This function should do all processing of the filename.
 */
static FILE * source_open_dumpfile(const char * filename) {
#ifndef _WIN32
    /* some of the below functions seems not to be standard winapi functions */
    char buffer[PATH_MAX];
    time_t curtime;
    struct tm *loctime;

    /* Get the current time. */
    curtime = time (NULL);

    /* Convert it to local time representation. */
    loctime = localtime (&curtime);

    strftime (buffer, sizeof(buffer), filename, loctime);
    filename = buffer;
#endif

    return fopen (filename, "ab");
}

Karl Heyes's avatar
Karl Heyes committed
614 615 616
/* Perform any initialisation just before the stream data is processed, the header
 * info is processed by now and the format details are setup
 */
617
static void source_init (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
618
{
619
    char listenurl[512];
620
    const char *str;
621

622 623 624 625 626 627 628 629
    str = httpp_getvar(source->parser, "ice-audio-info");
    source->audio_info = util_dict_new();
    if (str)
    {
        _parse_audio_info (source, str);
        stats_event (source->mount, "audio_info", str);
    }

630
    client_get_baseurl(NULL, NULL, listenurl, sizeof(listenurl), NULL, NULL, NULL, NULL, NULL);
631 632
    stats_event (source->mount, "listenurl", listenurl);

633 634
    if (source->dumpfilename != NULL)
    {
635
        source->dumpfile = source_open_dumpfile (source->dumpfilename);
636 637
        if (source->dumpfile == NULL)
        {
638
            ICECAST_LOG_WARN("Cannot open dump file \"%s\" for appending: %s, disabling.",
639 640 641 642
                    source->dumpfilename, strerror(errno));
        }
    }

643 644 645 646 647 648
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock (source->shutdown_rwlock);

    /* start off the statistics */
    source->listeners = 0;
    stats_event_inc (NULL, "source_total_connections");
649
    stats_event (source->mount, "slow_listeners", "0");
650
    stats_event_args (source->mount, "listeners", "%lu", source->listeners);
651
    stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
Karl Heyes's avatar
Karl Heyes committed
652
    stats_event_time (source->mount, "stream_start");
653
    stats_event_time_iso8601 (source->mount, "stream_start_iso8601");
654

655
    ICECAST_LOG_DEBUG("Source creation complete");
656
    source->last_read = time (NULL);
657
    source->prev_listeners = -1;
658
    source->running = 1;
659

660
    event_emit_clientevent("source-connect", source->client, source->mount);
661

662 663
    /*
    ** Now, if we have a fallback source and override is on, we want
664
    ** to steal its clients, because it means we've come back online
665 666 667 668
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

669 670 671 672
    if (source->fallback_override && source->fallback_mount)
    {
        source_t *fallback_source;

673 674 675
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);

676 677
        if (fallback_source)
            source_move_clients (fallback_source, source);
678

679
        avl_tree_unlock(global.source_tree);
680
    }
681 682 683 684 685
}


void source_main (source_t *source)
{
686
    refbuf_t *refbuf;
687 688 689 690
    client_t *client;
    avl_node *client_node;

    source_init (source);
691

692
    while (global.running == ICECAST_RUNNING && source->running) {
693
        int remove_from_q;
Jack Moffitt's avatar
Jack Moffitt committed
694

695
        refbuf = get_next_buffer (source);
696

697 698
        remove_from_q = 0;
        source->short_delay = 0;
699

700 701 702 703
        if (refbuf)
        {
            /* append buffer to the in-flight data queue,  */
            if (source->stream_data == NULL)
Michael Smith's avatar
Michael Smith committed
704
            {
705 706
                source->stream_data = refbuf;
                source->burst_point = refbuf;
707
            }
708 709 710 711 712
            if (source->stream_data_tail)
                source->stream_data_tail->next = refbuf;
            source->stream_data_tail = refbuf;
            source->queue_size += refbuf->len;
            /* new buffer is referenced for burst */
713
            refbuf_addref(refbuf);
714 715 716

            /* new data on queue, so check the burst point */
            source->burst_offset += refbuf->len;
717
            while (source->burst_offset > source->burst_size)
718
            {
719 720 721
                refbuf_t *to_release = source->burst_point;

                if (to_release->next)
722
                {
723 724
                    source->burst_point = to_release->next;
                    source->burst_offset -= to_release->len;
725
                    refbuf_release(to_release);
726
                    continue;
727
                }
728
                break;
729 730
            }

731 732
            /* save stream to file */
            if (source->dumpfile && source->format->write_buf_to_file)
733
                source->format->write_buf_to_file(source, refbuf);
734
        }
735
        /* lets see if we have too much data in the queue, but don't remove it until later */
736
        thread_mutex_lock(&source->lock);
737 738
        if (source->queue_size > source->queue_size_limit)
            remove_from_q = 1;
739
        thread_mutex_unlock(&source->lock);
740

741 742 743
        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

744 745 746 747 748
        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        client_node = avl_get_first(source->client_tree);
        while (client_node) {
749
            client = (client_t *) client_node->key;
750

751
            send_to_listener(source, client, remove_from_q);
752

753 754
            if (client->con->error) {
                client_node = avl_get_next(client_node);
755
                if (client->respcode == 200)
756 757
                    stats_event_dec(NULL, "listeners");
                avl_delete(source->client_tree, (void *) client, _free_client);
758
                source->listeners--;
759
                ICECAST_LOG_DEBUG("Client removed");
760 761 762 763 764 765 766 767
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
768

769 770
            if(source->max_listeners != -1 &&
                    source->listeners >= (unsigned long)source->max_listeners)
771 772 773 774 775 776 777 778 779 780
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

781
                ICECAST_LOG_INFO("Client deleted, exceeding maximum listeners for this "
ePirat's avatar
ePirat committed
782
                        "mountpoint (%s).", source->mount);
783 784
                continue;
            }
785

786
            /* Otherwise, the client is accepted, add it */
787
            avl_insert(source->client_tree, client_node->key);
788 789

            source->listeners++;
790
            ICECAST_LOG_DEBUG("Client added for mountpoint (%s)", source->mount);
791 792 793 794 795 796 797
            stats_event_inc(source->mount, "connections");

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
798 799
            avl_delete(source->pending_tree,
                    avl_get_first(source->pending_tree)->key,
800
                    source_remove_client);
801 802 803 804 805
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

806
        /* update the stats if need be */
807
        if (source->listeners != source->prev_listeners)
808
        {
809
            source->prev_listeners = source->listeners;
810
            ICECAST_LOG_INFO("listener count on %s now %lu", source->mount, source->listeners);
Karl Heyes's avatar
Karl Heyes committed
811 812 813 814 815
            if (source->listeners > source->peak_listeners)
            {
                source->peak_listeners = source->listeners;
                stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
            }
816
            stats_event_args (source->mount, "listeners", "%lu", source->listeners);
817 818
            if (source->listeners == 0 && source->on_demand)
                source->running = 0;
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835
        }

        /* lets reduce the queue, any lagging clients should of been
         * terminated by now
         */
        if (source->stream_data)
        {
            /* normal unreferenced queue data will have a refcount 1, but
             * burst queue data will be at least 2, active clients will also
             * increase refcount */
            while (source->stream_data->_count == 1)
            {
                refbuf_t *to_go = source->stream_data;

                if (to_go->next == NULL || source->burst_point == to_go)
                {
                    /* this should not happen */
836
                    ICECAST_LOG_ERROR("queue state is unexpected");
837 838 839 840 841
                    source->running = 0;
                    break;
                }
                source->stream_data = to_go->next;
                source->queue_size -= to_go->len;
842
                to_go->next = NULL;
843 844 845 846
                refbuf_release (to_go);
            }
        }

847 848 849
        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
850 851
    source_shutdown (source);
}
Jack Moffitt's avatar
Jack Moffitt committed
852

Michael Smith's avatar
Michael Smith committed
853

854 855
static void source_shutdown (source_t *source)
{
856
    source->running = 0;
857 858 859 860 861
    if (source->con && source->con->ip) {
        ICECAST_LOG_INFO("Source from %s at \"%s\" exiting", source->con->ip, source->mount);
    } else {
        ICECAST_LOG_INFO("Source at \"%s\" exiting", source->mount);
    }
862

863
    event_emit_clientevent("source-disconnect", source->client, source->mount);
864

865 866
    /* we have de-activated the source now, so no more clients will be
     * added, now move the listeners we have to the fallback (if any)
867
     */
868 869 870
    if (source->fallback_mount)
    {
        source_t *fallback_source;
Michael Smith's avatar
Michael Smith committed
871

872
        avl_tree_rlock(global.source_tree);
873
        fallback_source = source_find_mount(source->fallback_mount);
Michael Smith's avatar
Michael Smith committed
874

875
        if (fallback_source != NULL)
876
            source_move_clients(source, fallback_source);
877

878
        avl_tree_unlock(global.source_tree);
879
    }
Jack Moffitt's avatar
Jack Moffitt committed
880

881
    /* delete this sources stats */
882
    stats_event(source->mount, NULL, NULL);
Jack Moffitt's avatar
Jack Moffitt committed
883

884 885 886 887 888 889 890 891
    if (source->client && source->parser) {
        /* For PUT support we check for 100-continue and send back a final 200. */
        const char *expectcontinue = httpp_getvar(source->parser, "expect");

        if (expectcontinue != NULL) {
#ifdef HAVE_STRCASESTR
            if (strcasestr (expectcontinue, "100-continue") != NULL)
#else
892
            ICECAST_LOG_WARN("OS doesn't support case insensitive substring checks...");
893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
            if (strstr (expectcontinue, "100-continue") != NULL)
#endif
            {
                client_t *client = source->client;
                source->client = NULL; /* detach client from source. */

                util_http_build_header(client->refbuf->data, PER_CLIENT_REFBUF_SIZE, 0, 0, 200, NULL, NULL, NULL, "", NULL, source->client);
                client->refbuf->len = strlen(client->refbuf->data);
                refbuf_release(client->refbuf->next);
                client->refbuf->next = NULL;
                client->pos = 0;
                fserve_add_client(client, NULL);
            }
        }
    }

909
    /* we don't remove the source from the tree here, it may be a relay and
910 911
     therefore reserved */
    source_clear_source(source);
912

913 914
    global_lock();
    global.sources--;
915
    stats_event_args(NULL, "sources", "%d", global.sources);
916
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
917

918 919
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
920 921
}

922

Jack Moffitt's avatar
Jack Moffitt committed
923 924
static int _compare_clients(void *compare_arg, void *a, void *b)
{
925 926
    client_t *clienta = (client_t *) a;
    client_t *clientb = (client_t *) b;
927

928 929
    (void)compare_arg;

930 931
    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
932

933 934
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
935

936
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
937 938
}

939
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
940
{
941
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
942 943 944 945
}

static int _free_client(void *key)
{
946 947
    client_t *client = (client_t *)key;

948 949 950
    switch (client->respcode) {
        case 0:
            /* if no response has been sent then send a 404 */
951
            client_send_error_by_id(client, ICECAST_ERROR_SOURCE_MOUNT_UNAVAILABLE);
952 953
            break;
        case 500:
954
            client_send_error_by_id(client, ICECAST_ERROR_SOURCE_STREAM_PREPARATION_ERROR);
955 956 957 958 959
            break;
        default:
            client_destroy(client);
            break;
    }
960
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
961
}
962

963
static void _parse_audio_info (source_t *source, const char *s)
964
{
965
    const char *start = s;
966
    unsigned int len;
967 968 969 970 971 972 973 974 975 976 977 978

    while (start != NULL && *start != '\0')
    {
        if ((s = strchr (start, ';')) == NULL)
            len = strlen (start);
        else
        {
            len = (int)(s - start);
            s++; /* skip passed the ';' */
        }
        if (len)
        {
979
            char name[100], value[200];
980 981
            char *esc;

982
            sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
983 984 985 986
            esc = util_url_unescape (value);
            if (esc)
            {
                util_dict_set (source->audio_info, name, esc);
987
                stats_event (source->mount, name, esc);
988
                free (esc);
989 990
            }
        }
991
        start = s;
992 993
    }
}
994 995


996
/* Apply the mountinfo details to the source */
Philipp Schafft's avatar
Philipp Schafft committed
997
static void source_apply_mount (ice_config_t *config, source_t *source, mount_proxy *mountinfo)
998
{
999
    const char *str;
1000 1001
    int val;
    http_parser_t *parser = NULL;
Philipp Schafft's avatar
Philipp Schafft committed
1002
    acl_t *acl = NULL;
1003

1004
    ICECAST_LOG_DEBUG("Applying mount information for \"%s\"", source->mount);
1005
    avl_tree_rlock (source->client_tree);
1006 1007
    stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);

1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
    if (mountinfo)
    {
        source->max_listeners = mountinfo->max_listeners;
        source->fallback_override = mountinfo->fallback_override;
        source->hidden = mountinfo->hidden;
    }

    /* if a setting is available in the mount details then use it, else
     * check the parser details. */

    if (source->client)
        parser = source->client->parser;

1021 1022 1023 1024
    /* to be done before possible non-utf8 stats */
    if (source->format && source->format->apply_settings)
        source->format->apply_settings (source->client, source->format, mountinfo);

1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046
    /* public */
    if (mountinfo && mountinfo->yp_public >= 0)
        val = mountinfo->yp_public;
    else
    {
        do {
            str = httpp_getvar (parser, "ice-public");
            if (str) break;
            str = httpp_getvar (parser, "icy-pub");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-public");
            if (str) break;
            /* handle header from icecast v2 release */
            str = httpp_getvar (parser, "icy-public");
            if (str) break;
            str = "0";
        } while (0);
        val = atoi (str);
    }
    stats_event_args (source->mount, "public", "%d", val);
    if (source->yp_public != val)
    {
1047
        ICECAST_LOG_DEBUG("YP changed to %d", val);
1048 1049 1050 1051 1052 1053 1054 1055 1056
        if (val)
            yp_add (source->mount);
        else
            yp_remove (source->mount);
        source->yp_public = val;
    }

    /* stream name */
    if (mountinfo && mountinfo->stream_name)
1057
        stats_event (source->mount, "server_name", mountinfo->stream_name);
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
    else
    {
        do {
            str = httpp_getvar (parser, "ice-name");
            if (str) break;
            str = httpp_getvar (parser, "icy-name");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-name");
            if (str) break;
            str = "Unspecified name";
        } while (0);
1069 1070
        if (source->format)
            stats_event_conv (source->mount, "server_name", str, source->format->charset);
1071 1072 1073 1074
    }

    /* stream description */
    if (mountinfo && mountinfo->stream_description)
1075
        stats_event (source->mount, "server_description", mountinfo->stream_description);
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
    else
    {
        do {
            str = httpp_getvar (parser, "ice-description");
            if (str) break;
            str = httpp_getvar (parser, "icy-description");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-description");
            if (str) break;
            str = "Unspecified description";
        } while (0);
1087 1088
        if (source->format)
            stats_event_conv (source->mount, "server_description", str, source->format->charset);
1089 1090 1091 1092
    }

    /* stream URL */
    if (mountinfo && mountinfo->stream_url)
1093
        stats_event (source->mount, "server_url", mountinfo->stream_url);
1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
    else
    {
        do {
            str = httpp_getvar (parser, "ice-url");
            if (str) break;
            str = httpp_getvar (parser, "icy-url");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-url");
            if (str) break;
        } while (0);
1104 1105
        if (str && source->format)
            stats_event_conv (source->mount, "server_url", str, source->format->charset);
1106 1107 1108 1109
    }

    /* stream genre */
    if (mountinfo && mountinfo->stream_genre)
1110
        stats_event (source->mount, "genre", mountinfo->stream_genre);
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
    else
    {
        do {
            str = httpp_getvar (parser, "ice-genre");
            if (str) break;
            str = httpp_getvar (parser, "icy-genre");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-genre");
            if (str) break;
            str = "various";
        } while (0);
1122 1123
        if (source->format)
            stats_event_conv (source->mount, "genre", str, source->format->charset);
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
    }

    /* stream bitrate */
    if (mountinfo && mountinfo->bitrate)
        str = mountinfo->bitrate;
    else
    {
        do {
            str = httpp_getvar (parser, "ice-bitrate");
            if (str) break;
            str = httpp_getvar (parser, "icy-br");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-bitrate");
        } while (0);
    }
    stats_event (source->mount, "bitrate", str);

    /* handle MIME-type */
    if (mountinfo && mountinfo->type)
        stats_event (source->mount, "server_type", mountinfo->type);
    else
        if (source->format)
            stats_event (source->mount, "server_type", source->format->contenttype);

    if (mountinfo && mountinfo->subtype)
        stats_event (source->mount, "subtype", mountinfo->subtype);
1150

Philipp Schafft's avatar
Philipp Schafft committed
1151
    if (mountinfo)
1152
        acl = auth_stack_get_anonymous_acl(mountinfo->authstack, httpp_req_get);
Philipp Schafft's avatar
Philipp Schafft committed
1153
    if (!acl)
1154
        acl = auth_stack_get_anonymous_acl(config->authstack, httpp_req_get);
Philipp Schafft's avatar
Philipp Schafft committed
1155 1156
    if (acl && acl_test_web(acl) == ACL_POLICY_DENY)
        stats_event (source->mount, "authenticator", "(dummy)");
1157 1158
    else
        stats_event (source->mount, "authenticator", NULL);
Philipp Schafft's avatar
Philipp Schafft committed
1159
    acl_release(acl);
1160

1161 1162 1163
    if (mountinfo && mountinfo->fallback_mount)
    {
        char *mount = source->fallback_mount;
1164
        source->fallback_mount = strdup (mountinfo->fallback_mount);
1165 1166 1167 1168
        free (mount);
    }
    else
        source->fallback_mount = NULL;
1169

1170
    if (mountinfo && mountinfo->dumpfile)
1171
    {
1172
        char *filename = source->dumpfilename;
1173
        source->dumpfilename = strdup (mountinfo->dumpfile);
1174
        free (filename);
1175
    }
1176 1177
    else
        source->dumpfilename = NULL;
1178

1179 1180 1181 1182 1183 1184
    if (source->intro_file)
    {
        fclose (source->intro_file);
        source->intro_file = NULL;
    }
    if (mountinfo && mountinfo->intro_filename)
1185 1186 1187 1188 1189 1190 1191
    {
        ice_config_t *config = config_get_config_unlocked ();
        unsigned int len  = strlen (config->webroot_dir) +
            strlen (mountinfo->intro_filename) + 2;
        char *path = malloc (len);
        if (path)
        {
1192
            FILE *f;
1193 1194 1195
            snprintf (path, len, "%s" PATH_SEPARATOR "%s", config->webroot_dir,
                    mountinfo->intro_filename);

1196 1197 1198 1199
            f = fopen (path, "rb");
            if (f)
                source->intro_file = f;
            else
1200
                ICECAST_LOG_WARN("Cannot open intro file \"%s\": %s", path, strerror(errno));
1201 1202 1203 1204
            free (path);
        }
    }

1205
    if (mountinfo && mountinfo->queue_size_limit)
1206
        source->queue_size_limit = mountinfo->queue_size_limit;
1207

1208
    if (mountinfo && mountinfo->source_timeout)
1209
        source->timeout = mountinfo->source_timeout;
1210

1211
    if (mountinfo && mountinfo->burst_size >= 0)
1212
        source->burst_size = (unsigned int) mountinfo->burst_size;
1213

1214 1215 1216
    if (mountinfo && mountinfo->fallback_when_full)
        source->fallback_when_full = mountinfo->fallback_when_full;

1217 1218 1219
    if (mountinfo && mountinfo->max_history > 0)
        playlist_set_max_tracks(source->history, mountinfo->max_history);

1220
    avl_tree_unlock(source->client_tree);
Karl Heyes's avatar