source.c 43.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
18 19 20 21
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
22
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
23
#include <errno.h>
24 25

#ifndef _WIN32
26
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
27
#include <sys/time.h>
28
#include <sys/socket.h>
29
#include <sys/wait.h>
30
#include <limits.h>
31
#else
32 33
#include <winsock2.h>
#include <windows.h>
34
#define snprintf _snprintf
35
#endif
Jack Moffitt's avatar
Jack Moffitt committed
36

Karl Heyes's avatar
Karl Heyes committed
37 38 39 40
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
41 42 43 44 45 46

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
47
#include "logging.h"
48
#include "cfgfile.h"
49
#include "util.h"
Jack Moffitt's avatar
Jack Moffitt committed
50
#include "source.h"
Michael Smith's avatar
Michael Smith committed
51
#include "format.h"
52
#include "fserve.h"
Michael Smith's avatar
Michael Smith committed
53
#include "auth.h"
54
#include "compat.h"
Jack Moffitt's avatar
Jack Moffitt committed
55

56 57 58
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
59 60
#define MAX_FALLBACK_DEPTH 10

61 62
mutex_t move_clients_mutex;

Jack Moffitt's avatar
Jack Moffitt committed
63 64 65
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
66
static void _parse_audio_info (source_t *source, const char *s);
Karl Heyes's avatar
Karl Heyes committed
67
static void source_shutdown (source_t *source);
68 69 70 71 72
#ifdef _WIN32
#define source_run_script(x,y)  WARN0("on [dis]connect scripts disabled");
#else
static void source_run_script (char *command, char *mountpoint);
#endif
Jack Moffitt's avatar
Jack Moffitt committed
73

74 75 76 77 78 79 80 81
/* Allocate a new source with the stated mountpoint, if one already
 * exists with that mountpoint in the global source tree then return
 * NULL.
 */
source_t *source_reserve (const char *mount)
{
    source_t *src = NULL;

82 83 84 85
    if(mount[0] != '/')
        WARN1("Source at \"%s\" does not start with '/', clients will be "
                "unable to connect", mount);

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    do
    {
        avl_tree_wlock (global.source_tree);
        src = source_find_mount_raw (mount);
        if (src)
        {
            src = NULL;
            break;
        }

        src = calloc (1, sizeof(source_t));
        if (src == NULL)
            break;

        src->client_tree = avl_tree_new(_compare_clients, NULL);
        src->pending_tree = avl_tree_new(_compare_clients, NULL);

        /* make duplicates for strings or similar */
        src->mount = strdup (mount);
        src->max_listeners = -1;
106
        thread_mutex_create(&src->lock);
107 108 109 110 111 112 113 114 115 116

        avl_insert (global.source_tree, src);

    } while (0);

    avl_tree_unlock (global.source_tree);
    return src;
}


Michael Smith's avatar
Michael Smith committed
117 118 119 120
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
121
{
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
145 146
}

147 148

/* Search for mount, if the mount is there but not currently running then
Karl Heyes's avatar
Karl Heyes committed
149
 * check the fallback, and so on.  Must have a global source lock to call
150 151 152
 * this function.
 */
source_t *source_find_mount (const char *mount)
Michael Smith's avatar
Michael Smith committed
153
{
154
    source_t *source = NULL;
Michael Smith's avatar
Michael Smith committed
155
    ice_config_t *config;
156 157 158 159
    mount_proxy *mountinfo;
    int depth = 0;

    config = config_get_config();
160
    while (mount && depth < MAX_FALLBACK_DEPTH)
161 162
    {
        source = source_find_mount_raw(mount);
Michael Smith's avatar
Michael Smith committed
163

164 165 166 167 168
        if (source)
        {
            if (source->running || source->on_demand)
                break;
        }
Michael Smith's avatar
Michael Smith committed
169

170 171 172
        /* we either have a source which is not active (relay) or no source
         * at all. Check the mounts list for fallback settings
         */
173
        mountinfo = config_find_mount (config, mount, MOUNT_TYPE_NORMAL);
174
        source = NULL;
175 176 177 178

        if (mountinfo == NULL)
            break;
        mount = mountinfo->fallback_mount;
179
        depth++;
Michael Smith's avatar
Michael Smith committed
180 181
    }

182
    config_release_config();
Michael Smith's avatar
Michael Smith committed
183 184 185 186
    return source;
}


Jack Moffitt's avatar
Jack Moffitt committed
187 188
int source_compare_sources(void *arg, void *a, void *b)
{
189 190
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
191

192
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
193 194
}

195 196 197

void source_clear_source (source_t *source)
{
198 199
    int c;

200
    DEBUG1 ("clearing source \"%s\"", source->mount);
201

202
    avl_tree_wlock (source->pending_tree);
203 204
    client_destroy(source->client);
    source->client = NULL;
205 206
    source->parser = NULL;
    source->con = NULL;
207

208 209 210 211
    /* log bytes read in access log */
    if (source->client && source->format)
        source->client->con->sent_bytes = source->format->read_bytes;

212 213 214 215 216 217 218
    if (source->dumpfile)
    {
        INFO1 ("Closing dumpfile for %s", source->mount);
        fclose (source->dumpfile);
        source->dumpfile = NULL;
    }

219
    /* lets kick off any clients that are left on here */
220
    avl_tree_wlock (source->client_tree);
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
    c=0;
    while (1)
    {
        avl_node *node = avl_get_first (source->client_tree);
        if (node)
        {
            client_t *client = node->key;
            if (client->respcode == 200)
                c++; /* only count clients that have had some processing */
            avl_delete (source->client_tree, client, _free_client);
            continue;
        }
        break;
    }
    if (c)
236
    {
237 238
        stats_event_sub (NULL, "listeners", source->listeners);
        INFO2 ("%d active listeners on %s released", c, source->mount);
239
    }
240
    avl_tree_unlock (source->client_tree);
241 242 243 244 245 246 247 248 249 250

    while (avl_get_first (source->pending_tree))
    {
        avl_delete (source->pending_tree,
                avl_get_first(source->pending_tree)->key, _free_client);
    }

    if (source->format && source->format->free_plugin)
        source->format->free_plugin (source->format);
    source->format = NULL;
251 252 253 254 255 256

    /* Lets clear out the source queue too */
    while (source->stream_data)
    {
        refbuf_t *p = source->stream_data;
        source->stream_data = p->next;
257
        p->next = NULL;
258 259 260 261 262 263 264
        /* can be referenced by burst handler as well */
        while (p->_count > 1)
            refbuf_release (p);
        refbuf_release (p);
    }
    source->stream_data_tail = NULL;

Karl Heyes's avatar
Karl Heyes committed
265 266 267 268
    source->burst_point = NULL;
    source->burst_size = 0;
    source->burst_offset = 0;
    source->queue_size = 0;
269
    source->queue_size_limit = 0;
270 271
    source->listeners = 0;
    source->max_listeners = -1;
272
    source->prev_listeners = 0;
273
    source->hidden = 0;
274
    source->shoutcast_compat = 0;
275
    source->client_stats_update = 0;
276 277
    util_dict_free (source->audio_info);
    source->audio_info = NULL;
278 279 280 281 282 283

    free(source->fallback_mount);
    source->fallback_mount = NULL;

    free(source->dumpfilename);
    source->dumpfilename = NULL;
Karl Heyes's avatar
Karl Heyes committed
284 285 286 287 288 289

    if (source->intro_file)
    {
        fclose (source->intro_file);
        source->intro_file = NULL;
    }
290 291

    source->on_demand_req = 0;
292
    avl_tree_unlock (source->pending_tree);
293 294 295
}


296
/* Remove the provided source from the global tree and free it */
297
void source_free_source (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
298
{
299 300 301 302 303
    DEBUG1 ("freeing source \"%s\"", source->mount);
    avl_tree_wlock (global.source_tree);
    avl_delete (global.source_tree, source, NULL);
    avl_tree_unlock (global.source_tree);

304 305
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
306

307 308 309
    /* make sure all YP entries have gone */
    yp_remove (source->mount);

310 311
    free (source->mount);
    free (source);
Jack Moffitt's avatar
Jack Moffitt committed
312

313
    return;
Jack Moffitt's avatar
Jack Moffitt committed
314
}
315

316

317 318 319
client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
320
    void *result;
321 322 323 324 325 326
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
327
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
328 329 330 331 332 333 334 335
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
336

337

338 339 340 341 342
/* Move clients from source to dest provided dest is running
 * and that the stream format is the same.
 * The only lock that should be held when this is called is the
 * source tree lock
 */
343 344
void source_move_clients (source_t *source, source_t *dest)
{
345
    unsigned long count = 0;
346 347 348 349 350
    if (strcmp (source->mount, dest->mount) == 0)
    {
        WARN1 ("src and dst are the same \"%s\", skipping", source->mount);
        return;
    }
351 352 353 354
    /* we don't want the two write locks to deadlock in here */
    thread_mutex_lock (&move_clients_mutex);

    /* if the destination is not running then we can't move clients */
355

356
    avl_tree_wlock (dest->pending_tree);
357
    if (dest->running == 0 && dest->on_demand == 0)
358
    {
359
        WARN1 ("destination mount %s not running, unable to move clients ", dest->mount);
360
        avl_tree_unlock (dest->pending_tree);
361
        thread_mutex_unlock (&move_clients_mutex);
362 363 364
        return;
    }

365
    do
366
    {
367
        client_t *client;
368

369 370
        /* we need to move the client and pending trees - we must take the
         * locks in this order to avoid deadlocks */
371
        avl_tree_wlock (source->pending_tree);
372
        avl_tree_wlock (source->client_tree);
373

374
        if (source->on_demand == 0 && source->format == NULL)
375 376
        {
            INFO1 ("source mount %s is not available", source->mount);
377
            break;
378
        }
379
        if (source->format && dest->format)
380
        {
381 382 383 384 385
            if (source->format->type != dest->format->type)
            {
                WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
                break;
            }
386
        }
387

388 389 390 391 392 393 394
        while (1)
        {
            avl_node *node = avl_get_first (source->pending_tree);
            if (node == NULL)
                break;
            client = (client_t *)(node->key);
            avl_delete (source->pending_tree, client, NULL);
395

396 397 398 399 400 401 402 403
            /* when switching a client to a different queue, be wary of the 
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
404 405
                if (source->con == NULL)
                    client->intro_offset = -1;
406 407
            }

408
            avl_insert (dest->pending_tree, (void *)client);
409
            count++;
410 411 412 413 414 415 416 417 418 419 420
        }

        while (1)
        {
            avl_node *node = avl_get_first (source->client_tree);
            if (node == NULL)
                break;

            client = (client_t *)(node->key);
            avl_delete (source->client_tree, client, NULL);

421 422 423 424 425 426 427 428
            /* when switching a client to a different queue, be wary of the 
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
429 430
                if (source->con == NULL)
                    client->intro_offset = -1;
431
            }
432
            avl_insert (dest->pending_tree, (void *)client);
433
            count++;
434
        }
435 436
        INFO2 ("passing %lu listeners to \"%s\"", count, dest->mount);

437 438 439 440 441
        source->listeners = 0;
        stats_event (source->mount, "listeners", "0");

    } while (0);

442 443 444
    avl_tree_unlock (source->pending_tree);
    avl_tree_unlock (source->client_tree);

445 446 447 448
    /* see if we need to wake up an on-demand relay */
    if (dest->running == 0 && dest->on_demand && count)
        dest->on_demand_req = 1;

449 450 451 452
    avl_tree_unlock (dest->pending_tree);
    thread_mutex_unlock (&move_clients_mutex);
}

453

Karl Heyes's avatar
Karl Heyes committed
454 455 456 457 458 459 460 461 462 463 464 465 466
/* get some data from the source. The stream data is placed in a refbuf
 * and sent back, however NULL is also valid as in the case of a short
 * timeout and there's no data pending.
 */
static refbuf_t *get_next_buffer (source_t *source)
{
    refbuf_t *refbuf = NULL;
    int delay = 250;

    if (source->short_delay)
        delay = 0;
    while (global.running == ICE_RUNNING && source->running)
    {
467
        int fds = 0;
Karl Heyes's avatar
Karl Heyes committed
468 469
        time_t current = time (NULL);

470
        if (source->client)
471 472 473 474 475 476
            fds = util_timed_wait_for_fd (source->con->sock, delay);
        else
        {
            thread_sleep (delay*1000);
            source->last_read = current;
        }
Karl Heyes's avatar
Karl Heyes committed
477

478 479 480
        if (current >= source->client_stats_update)
        {
            stats_event_args (source->mount, "total_bytes_read",
481
                    "%"PRIu64, source->format->read_bytes);
482
            stats_event_args (source->mount, "total_bytes_sent",
483
                    "%"PRIu64, source->format->sent_bytes);
484 485
            source->client_stats_update = current + 5;
        }
Karl Heyes's avatar
Karl Heyes committed
486 487 488 489 490 491 492 493 494 495 496
        if (fds < 0)
        {
            if (! sock_recoverable (sock_error()))
            {
                WARN0 ("Error while waiting on socket, Disconnecting source");
                source->running = 0;
            }
            break;
        }
        if (fds == 0)
        {
497 498
            thread_mutex_lock(&source->lock);
            if ((source->last_read + (time_t)source->timeout) < current)
Karl Heyes's avatar
Karl Heyes committed
499
            {
500 501
                DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
                        source->timeout, (long)current);
Karl Heyes's avatar
Karl Heyes committed
502 503 504
                WARN0 ("Disconnecting source due to socket timeout");
                source->running = 0;
            }
505
            thread_mutex_unlock(&source->lock);
Karl Heyes's avatar
Karl Heyes committed
506 507 508 509
            break;
        }
        source->last_read = current;
        refbuf = source->format->get_buffer (source);
510
        if (source->client->con && source->client->con->error)
511 512 513 514 515
        {
            INFO1 ("End of Stream %s", source->mount);
            source->running = 0;
            continue;
        }
Karl Heyes's avatar
Karl Heyes committed
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
        if (refbuf)
            break;
    }

    return refbuf;
}


/* general send routine per listener.  The deletion_expected tells us whether
 * the last in the queue is about to disappear, so if this client is still
 * referring to it after writing then drop the client as it's fallen too far
 * behind 
 */ 
static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
{
    int bytes;
    int loop = 10;   /* max number of iterations in one go */
    int total_written = 0;

    while (1)
    {
537 538 539 540 541 542 543 544
        /* check for limited listener time */
        if (client->con->discon_time)
            if (time(NULL) >= client->con->discon_time)
            {
                INFO1 ("time limit reached for client #%lu", client->con->id);
                client->con->error = 1;
            }

Karl Heyes's avatar
Karl Heyes committed
545 546 547 548 549 550 551 552
        /* jump out if client connection has died */
        if (client->con->error)
            break;

        /* lets not send too much to one client in one go, but don't
           sleep for too long if more data can be sent */
        if (total_written > 20000 || loop == 0)
        {
553 554
            if (client->check_buffer != format_check_file_buffer)
                source->short_delay = 1;
Karl Heyes's avatar
Karl Heyes committed
555 556 557 558 559
            break;
        }

        loop--;

Karl Heyes's avatar
Karl Heyes committed
560 561 562
        if (client->check_buffer (source, client) < 0)
            break;

563
        bytes = client->write_to_client (client);
Karl Heyes's avatar
Karl Heyes committed
564 565 566 567 568
        if (bytes <= 0)
            break;  /* can't write any more */

        total_written += bytes;
    }
569
    source->format->sent_bytes += total_written;
Karl Heyes's avatar
Karl Heyes committed
570 571 572

    /* the refbuf referenced at head (last in queue) may be marked for deletion
     * if so, check to see if this client is still referring to it */
Karl Heyes's avatar
Karl Heyes committed
573
    if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
Karl Heyes's avatar
Karl Heyes committed
574
    {
575 576 577
        INFO2 ("Client %lu (%s) has fallen too far behind, removing",
                client->con->id, client->con->ip);
        stats_event_inc (source->mount, "slow_listeners");
Karl Heyes's avatar
Karl Heyes committed
578 579 580 581
        client->con->error = 1;
    }
}

Jack Moffitt's avatar
Jack Moffitt committed
582

583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
/* Open the file for stream dumping.
 * This function should do all processing of the filename.
 */
static FILE * source_open_dumpfile(const char * filename) {
#ifndef _WIN32
    /* some of the below functions seems not to be standard winapi functions */
    char buffer[PATH_MAX];
    time_t curtime;
    struct tm *loctime;

    /* Get the current time. */
    curtime = time (NULL);

    /* Convert it to local time representation. */
    loctime = localtime (&curtime);

    strftime (buffer, sizeof(buffer), filename, loctime);
    filename = buffer;
#endif

    return fopen (filename, "ab");
}

Karl Heyes's avatar
Karl Heyes committed
606 607 608
/* Perform any initialisation just before the stream data is processed, the header
 * info is processed by now and the format details are setup
 */
609
static void source_init (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
610
{
611
    ice_config_t *config = config_get_config();
612 613
    char *listenurl;
    const char *str;
614
    int listen_url_size;
615
    mount_proxy *mountinfo;
616

617
    /* 6 for max size of port */
618
    listen_url_size = strlen("http://") + strlen(config->hostname) +
619
        strlen(":") + 6 + strlen(source->mount) + 1;
620 621 622

    listenurl = malloc (listen_url_size);
    memset (listenurl, '\000', listen_url_size);
623 624
    snprintf (listenurl, listen_url_size, "http://%s:%d%s",
            config->hostname, config->port, source->mount);
625 626
    config_release_config();

627 628 629 630 631 632 633 634
    str = httpp_getvar(source->parser, "ice-audio-info");
    source->audio_info = util_dict_new();
    if (str)
    {
        _parse_audio_info (source, str);
        stats_event (source->mount, "audio_info", str);
    }

635 636
    stats_event (source->mount, "listenurl", listenurl);

Michael Smith's avatar
Michael Smith committed
637
    free(listenurl);
638

639 640
    if (source->dumpfilename != NULL)
    {
641
        source->dumpfile = source_open_dumpfile (source->dumpfilename);
642 643 644 645 646 647 648
        if (source->dumpfile == NULL)
        {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    source->dumpfilename, strerror(errno));
        }
    }

649 650 651 652 653 654
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock (source->shutdown_rwlock);

    /* start off the statistics */
    source->listeners = 0;
    stats_event_inc (NULL, "source_total_connections");
655
    stats_event (source->mount, "slow_listeners", "0");
656
    stats_event_args (source->mount, "listeners", "%lu", source->listeners);
657
    stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
Karl Heyes's avatar
Karl Heyes committed
658
    stats_event_time (source->mount, "stream_start");
659
    stats_event_time_iso8601 (source->mount, "stream_start_iso8601");
660

661
    DEBUG0("Source creation complete");
Karl Heyes's avatar
Karl Heyes committed
662
    source->last_read = time (NULL);
663
    source->prev_listeners = -1;
664
    source->running = 1;
665

666
    mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
667 668 669 670
    if (mountinfo)
    {
        if (mountinfo->on_connect)
            source_run_script (mountinfo->on_connect, source->mount);
671
        auth_stream_start (mountinfo, source->mount);
672
    }
673 674
    config_release_config();

Michael Smith's avatar
Michael Smith committed
675 676
    /*
    ** Now, if we have a fallback source and override is on, we want
677
    ** to steal its clients, because it means we've come back online
Michael Smith's avatar
Michael Smith committed
678 679 680 681
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

682 683 684 685
    if (source->fallback_override && source->fallback_mount)
    {
        source_t *fallback_source;

Michael Smith's avatar
Michael Smith committed
686 687 688
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);

689 690
        if (fallback_source)
            source_move_clients (fallback_source, source);
Michael Smith's avatar
Michael Smith committed
691

692
        avl_tree_unlock(global.source_tree);
Michael Smith's avatar
Michael Smith committed
693
    }
694 695 696 697 698
}


void source_main (source_t *source)
{
Karl Heyes's avatar
Karl Heyes committed
699
    refbuf_t *refbuf;
700 701 702 703
    client_t *client;
    avl_node *client_node;

    source_init (source);
Michael Smith's avatar
Michael Smith committed
704

705
    while (global.running == ICE_RUNNING && source->running) {
Karl Heyes's avatar
Karl Heyes committed
706
        int remove_from_q;
Jack Moffitt's avatar
Jack Moffitt committed
707

Karl Heyes's avatar
Karl Heyes committed
708
        refbuf = get_next_buffer (source);
709

Karl Heyes's avatar
Karl Heyes committed
710 711
        remove_from_q = 0;
        source->short_delay = 0;
712

Karl Heyes's avatar
Karl Heyes committed
713 714 715 716
        if (refbuf)
        {
            /* append buffer to the in-flight data queue,  */
            if (source->stream_data == NULL)
Michael Smith's avatar
Michael Smith committed
717
            {
Karl Heyes's avatar
Karl Heyes committed
718 719
                source->stream_data = refbuf;
                source->burst_point = refbuf;
720
            }
Karl Heyes's avatar
Karl Heyes committed
721 722 723 724 725 726 727 728 729
            if (source->stream_data_tail)
                source->stream_data_tail->next = refbuf;
            source->stream_data_tail = refbuf;
            source->queue_size += refbuf->len;
            /* new buffer is referenced for burst */
            refbuf_addref (refbuf);

            /* new data on queue, so check the burst point */
            source->burst_offset += refbuf->len;
730
            while (source->burst_offset > source->burst_size)
Karl Heyes's avatar
Karl Heyes committed
731
            {
732 733 734
                refbuf_t *to_release = source->burst_point;

                if (to_release->next)
Karl Heyes's avatar
Karl Heyes committed
735
                {
736 737 738 739
                    source->burst_point = to_release->next;
                    source->burst_offset -= to_release->len;
                    refbuf_release (to_release);
                    continue;
740
                }
741
                break;
742 743
            }

Karl Heyes's avatar
Karl Heyes committed
744 745 746
            /* save stream to file */
            if (source->dumpfile && source->format->write_buf_to_file)
                source->format->write_buf_to_file (source, refbuf);
747
        }
Karl Heyes's avatar
Karl Heyes committed
748
        /* lets see if we have too much data in the queue, but don't remove it until later */
749
        thread_mutex_lock(&source->lock);
Karl Heyes's avatar
Karl Heyes committed
750 751
        if (source->queue_size > source->queue_size_limit)
            remove_from_q = 1;
752
        thread_mutex_unlock(&source->lock);
753

754 755 756
        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

757 758 759 760 761 762
        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
Karl Heyes's avatar
Karl Heyes committed
763 764 765

            send_to_listener (source, client, remove_from_q);

766 767
            if (client->con->error) {
                client_node = avl_get_next(client_node);
768 769
                if (client->respcode == 200)
                    stats_event_dec (NULL, "listeners");
770
                avl_delete(source->client_tree, (void *)client, _free_client);
Michael Smith's avatar
Michael Smith committed
771
                source->listeners--;
772
                DEBUG0("Client removed");
773 774 775 776 777 778 779 780
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
781

Michael Smith's avatar
Michael Smith committed
782
            if(source->max_listeners != -1 && 
783
                    source->listeners >= (unsigned long)source->max_listeners) 
Michael Smith's avatar
Michael Smith committed
784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

                INFO0("Client deleted, exceeding maximum listeners for this "
                        "mountpoint.");
                continue;
            }
            
            /* Otherwise, the client is accepted, add it */
800
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
801 802

            source->listeners++;
803
            DEBUG0("Client added");
804 805 806 807 808 809 810
            stats_event_inc(source->mount, "connections");

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
811 812 813
            avl_delete(source->pending_tree, 
                    avl_get_first(source->pending_tree)->key, 
                    source_remove_client);
814 815 816 817 818
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

Karl Heyes's avatar
Karl Heyes committed
819
        /* update the stats if need be */
820
        if (source->listeners != source->prev_listeners)
Karl Heyes's avatar
Karl Heyes committed
821
        {
822
            source->prev_listeners = source->listeners;
823
            INFO2("listener count on %s now %lu", source->mount, source->listeners);
Karl Heyes's avatar
Karl Heyes committed
824 825 826 827 828
            if (source->listeners > source->peak_listeners)
            {
                source->peak_listeners = source->listeners;
                stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
            }
829
            stats_event_args (source->mount, "listeners", "%lu", source->listeners);
830 831
            if (source->listeners == 0 && source->on_demand)
                source->running = 0;
Karl Heyes's avatar
Karl Heyes committed
832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854
        }

        /* lets reduce the queue, any lagging clients should of been
         * terminated by now
         */
        if (source->stream_data)
        {
            /* normal unreferenced queue data will have a refcount 1, but
             * burst queue data will be at least 2, active clients will also
             * increase refcount */
            while (source->stream_data->_count == 1)
            {
                refbuf_t *to_go = source->stream_data;

                if (to_go->next == NULL || source->burst_point == to_go)
                {
                    /* this should not happen */
                    ERROR0 ("queue state is unexpected");
                    source->running = 0;
                    break;
                }
                source->stream_data = to_go->next;
                source->queue_size -= to_go->len;
855
                to_go->next = NULL;
Karl Heyes's avatar
Karl Heyes committed
856 857 858 859
                refbuf_release (to_go);
            }
        }

860 861 862
        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Karl Heyes's avatar
Karl Heyes committed
863 864
    source_shutdown (source);
}
Jack Moffitt's avatar
Jack Moffitt committed
865

Michael Smith's avatar
Michael Smith committed
866

Karl Heyes's avatar
Karl Heyes committed
867 868
static void source_shutdown (source_t *source)
{
869 870
    mount_proxy *mountinfo;

871
    source->running = 0;
872
    INFO1("Source \"%s\" exiting", source->mount);
873

874
    mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
875 876 877 878
    if (mountinfo)
    {
        if (mountinfo->on_disconnect)
            source_run_script (mountinfo->on_disconnect, source->mount);
879
        auth_stream_end (mountinfo, source->mount);
880
    }
881 882
    config_release_config();

883 884
    /* we have de-activated the source now, so no more clients will be
     * added, now move the listeners we have to the fallback (if any)
885
     */
886 887 888
    if (source->fallback_mount)
    {
        source_t *fallback_source;
Michael Smith's avatar
Michael Smith committed
889

890 891
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (source->fallback_mount);
Michael Smith's avatar
Michael Smith committed
892

893 894 895 896 897
        if (fallback_source != NULL)
            source_move_clients (source, fallback_source);

        avl_tree_unlock (global.source_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
898

899
    /* delete this sources stats */
900
    stats_event(source->mount, NULL, NULL);
Jack Moffitt's avatar
Jack Moffitt committed
901

902 903 904 905
    /* we don't remove the source from the tree here, it may be a relay and
       therefore reserved */
    source_clear_source (source);

906 907
    global_lock();
    global.sources--;
Karl Heyes's avatar
Karl Heyes committed
908
    stats_event_args (NULL, "sources", "%d", global.sources);
909
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
910

911 912
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
913 914
}

915

Jack Moffitt's avatar
Jack Moffitt committed
916 917
static int _compare_clients(void *compare_arg, void *a, void *b)
{
918 919 920 921 922
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
923

924 925
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
926

927
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
928 929
}

930
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
931
{
932
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
933 934 935 936
}

static int _free_client(void *key)
{
937 938
    client_t *client = (client_t *)key;

939 940 941 942 943
    /* if no response has been sent then send a 404 */
    if (client->respcode == 0)
        client_send_404 (client, "Mount unavailable");
    else
        client_destroy(client);
944 945
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
946
}
947

948
static void _parse_audio_info (source_t *source, const char *s)
949
{
950
    const char *start = s;
Karl Heyes's avatar
Karl Heyes committed
951
    unsigned int len;
952 953 954 955 956 957 958 959 960 961 962 963

    while (start != NULL && *start != '\0')
    {
        if ((s = strchr (start, ';')) == NULL)
            len = strlen (start);
        else
        {
            len = (int)(s - start);
            s++; /* skip passed the ';' */
        }
        if (len)
        {
964
            char name[100], value[200];
965 966
            char *esc;

967
            sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
968 969 970 971
            esc = util_url_unescape (value);
            if (esc)
            {
                util_dict_set (source->audio_info, name, esc);
972
                stats_event (source->mount, name, esc);
973
                free (esc);
974 975
            }
        }
976
        start = s;
977 978
    }
}
979 980


981
/* Apply the mountinfo details to the source */
982
static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
983
{
984
    const char *str;
985 986 987
    int val;
    http_parser_t *parser = NULL;

988
    DEBUG1("Applying mount information for \"%s\"", source->mount);
989
    avl_tree_rlock (source->client_tree);
990 991
    stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);

992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004
    if (mountinfo)
    {
        source->max_listeners = mountinfo->max_listeners;
        source->fallback_override = mountinfo->fallback_override;
        source->hidden = mountinfo->hidden;
    }

    /* if a setting is available in the mount details then use it, else
     * check the parser details. */

    if (source->client)
        parser = source->client->parser;

1005 1006 1007 1008
    /* to be done before possible non-utf8 stats */
    if (source->format && source->format->apply_settings)
        source->format->apply_settings (source->client, source->format, mountinfo);

1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
    /* public */
    if (mountinfo && mountinfo->yp_public >= 0)
        val = mountinfo->yp_public;
    else
    {
        do {
            str = httpp_getvar (parser, "ice-public");
            if (str) break;
            str = httpp_getvar (parser, "icy-pub");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-public");
            if (str) break;
            /* handle header from icecast v2 release */
            str = httpp_getvar (parser, "icy-public");
            if (str) break;
            str = "0";
        } while (0);
        val = atoi (str);
    }
    stats_event_args (source->mount, "public", "%d", val);
    if (source->yp_public != val)
    {
        DEBUG1 ("YP changed to %d", val);
        if (val)
            yp_add (source->mount);
        else
            yp_remove (source->mount);
        source->yp_public = val;
    }

    /* stream name */
    if (mountinfo && mountinfo->stream_name)
1041
        stats_event (source->mount, "server_name", mountinfo->stream_name);
1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
    else
    {
        do {
            str = httpp_getvar (parser, "ice-name");
            if (str) break;
            str = httpp_getvar (parser, "icy-name");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-name");
            if (str) break;
            str = "Unspecified name";
        } while (0);
1053 1054
        if (source->format)
            stats_event_conv (source->mount, "server_name", str, source->format->charset);
1055 1056 1057