source.c 43.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
18 19 20 21
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
22
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
23
#include <errno.h>
24 25

#ifndef _WIN32
26
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
27
#include <sys/time.h>
28
#include <sys/socket.h>
29
#include <sys/wait.h>
30
#include <limits.h>
31
#else
32 33
#include <winsock2.h>
#include <windows.h>
34
#define snprintf _snprintf
35
#endif
Jack Moffitt's avatar
Jack Moffitt committed
36

Karl Heyes's avatar
Karl Heyes committed
37 38 39 40
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
41 42 43 44 45 46

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
47
#include "logging.h"
48
#include "cfgfile.h"
49
#include "util.h"
Jack Moffitt's avatar
Jack Moffitt committed
50
#include "source.h"
Michael Smith's avatar
Michael Smith committed
51
#include "format.h"
52
#include "fserve.h"
Michael Smith's avatar
Michael Smith committed
53
#include "auth.h"
54
#include "compat.h"
Jack Moffitt's avatar
Jack Moffitt committed
55

56 57 58
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
59 60
#define MAX_FALLBACK_DEPTH 10

61 62
mutex_t move_clients_mutex;

Jack Moffitt's avatar
Jack Moffitt committed
63 64 65
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
66
static void _parse_audio_info (source_t *source, const char *s);
Karl Heyes's avatar
Karl Heyes committed
67
static void source_shutdown (source_t *source);
68 69 70 71 72
#ifdef _WIN32
#define source_run_script(x,y)  WARN0("on [dis]connect scripts disabled");
#else
static void source_run_script (char *command, char *mountpoint);
#endif
Jack Moffitt's avatar
Jack Moffitt committed
73

74 75 76 77 78 79 80 81
/* Allocate a new source with the stated mountpoint, if one already
 * exists with that mountpoint in the global source tree then return
 * NULL.
 */
source_t *source_reserve (const char *mount)
{
    source_t *src = NULL;

82 83 84 85
    if(mount[0] != '/')
        WARN1("Source at \"%s\" does not start with '/', clients will be "
                "unable to connect", mount);

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    do
    {
        avl_tree_wlock (global.source_tree);
        src = source_find_mount_raw (mount);
        if (src)
        {
            src = NULL;
            break;
        }

        src = calloc (1, sizeof(source_t));
        if (src == NULL)
            break;

        src->client_tree = avl_tree_new(_compare_clients, NULL);
        src->pending_tree = avl_tree_new(_compare_clients, NULL);

        /* make duplicates for strings or similar */
        src->mount = strdup (mount);
        src->max_listeners = -1;
106
        thread_mutex_create(&src->lock);
107 108 109 110 111 112 113 114 115 116

        avl_insert (global.source_tree, src);

    } while (0);

    avl_tree_unlock (global.source_tree);
    return src;
}


Michael Smith's avatar
Michael Smith committed
117 118 119 120
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
121
{
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
145 146
}

147 148

/* Search for mount, if the mount is there but not currently running then
Karl Heyes's avatar
Karl Heyes committed
149
 * check the fallback, and so on.  Must have a global source lock to call
150 151 152
 * this function.
 */
source_t *source_find_mount (const char *mount)
Michael Smith's avatar
Michael Smith committed
153
{
154
    source_t *source = NULL;
Michael Smith's avatar
Michael Smith committed
155
    ice_config_t *config;
156 157 158 159
    mount_proxy *mountinfo;
    int depth = 0;

    config = config_get_config();
160
    while (mount && depth < MAX_FALLBACK_DEPTH)
161 162
    {
        source = source_find_mount_raw(mount);
Michael Smith's avatar
Michael Smith committed
163

164 165 166 167 168
        if (source)
        {
            if (source->running || source->on_demand)
                break;
        }
Michael Smith's avatar
Michael Smith committed
169

170 171 172
        /* we either have a source which is not active (relay) or no source
         * at all. Check the mounts list for fallback settings
         */
173
        mountinfo = config_find_mount (config, mount, MOUNT_TYPE_NORMAL);
174
        source = NULL;
175 176 177 178

        if (mountinfo == NULL)
            break;
        mount = mountinfo->fallback_mount;
179
        depth++;
Michael Smith's avatar
Michael Smith committed
180 181
    }

182
    config_release_config();
Michael Smith's avatar
Michael Smith committed
183 184 185 186
    return source;
}


Jack Moffitt's avatar
Jack Moffitt committed
187 188
int source_compare_sources(void *arg, void *a, void *b)
{
189 190
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
191

192
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
193 194
}

195 196 197

void source_clear_source (source_t *source)
{
198 199
    int c;

200
    DEBUG1 ("clearing source \"%s\"", source->mount);
201

202
    avl_tree_wlock (source->pending_tree);
203 204
    client_destroy(source->client);
    source->client = NULL;
205 206
    source->parser = NULL;
    source->con = NULL;
207

208 209 210 211
    /* log bytes read in access log */
    if (source->client && source->format)
        source->client->con->sent_bytes = source->format->read_bytes;

212 213 214 215 216 217 218
    if (source->dumpfile)
    {
        INFO1 ("Closing dumpfile for %s", source->mount);
        fclose (source->dumpfile);
        source->dumpfile = NULL;
    }

219
    /* lets kick off any clients that are left on here */
220
    avl_tree_wlock (source->client_tree);
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
    c=0;
    while (1)
    {
        avl_node *node = avl_get_first (source->client_tree);
        if (node)
        {
            client_t *client = node->key;
            if (client->respcode == 200)
                c++; /* only count clients that have had some processing */
            avl_delete (source->client_tree, client, _free_client);
            continue;
        }
        break;
    }
    if (c)
236
    {
237 238
        stats_event_sub (NULL, "listeners", source->listeners);
        INFO2 ("%d active listeners on %s released", c, source->mount);
239
    }
240
    avl_tree_unlock (source->client_tree);
241 242 243 244 245 246 247 248 249 250

    while (avl_get_first (source->pending_tree))
    {
        avl_delete (source->pending_tree,
                avl_get_first(source->pending_tree)->key, _free_client);
    }

    if (source->format && source->format->free_plugin)
        source->format->free_plugin (source->format);
    source->format = NULL;
251 252 253 254 255 256

    /* Lets clear out the source queue too */
    while (source->stream_data)
    {
        refbuf_t *p = source->stream_data;
        source->stream_data = p->next;
257
        p->next = NULL;
258 259 260 261 262 263 264
        /* can be referenced by burst handler as well */
        while (p->_count > 1)
            refbuf_release (p);
        refbuf_release (p);
    }
    source->stream_data_tail = NULL;

Karl Heyes's avatar
Karl Heyes committed
265 266 267 268
    source->burst_point = NULL;
    source->burst_size = 0;
    source->burst_offset = 0;
    source->queue_size = 0;
269
    source->queue_size_limit = 0;
270 271
    source->listeners = 0;
    source->max_listeners = -1;
272
    source->prev_listeners = 0;
273
    source->hidden = 0;
274
    source->shoutcast_compat = 0;
275
    source->client_stats_update = 0;
276 277
    util_dict_free (source->audio_info);
    source->audio_info = NULL;
278 279 280 281 282 283

    free(source->fallback_mount);
    source->fallback_mount = NULL;

    free(source->dumpfilename);
    source->dumpfilename = NULL;
Karl Heyes's avatar
Karl Heyes committed
284 285 286 287 288 289

    if (source->intro_file)
    {
        fclose (source->intro_file);
        source->intro_file = NULL;
    }
290 291

    source->on_demand_req = 0;
292
    avl_tree_unlock (source->pending_tree);
293 294 295
}


296
/* Remove the provided source from the global tree and free it */
297
void source_free_source (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
298
{
299 300 301 302 303
    DEBUG1 ("freeing source \"%s\"", source->mount);
    avl_tree_wlock (global.source_tree);
    avl_delete (global.source_tree, source, NULL);
    avl_tree_unlock (global.source_tree);

304 305
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
306

307 308 309
    /* make sure all YP entries have gone */
    yp_remove (source->mount);

310 311
    free (source->mount);
    free (source);
Jack Moffitt's avatar
Jack Moffitt committed
312

313
    return;
Jack Moffitt's avatar
Jack Moffitt committed
314
}
315

316

317 318 319
client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
320
    void *result;
321 322 323 324 325 326
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
327
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
328 329 330 331 332 333 334 335
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
336

337

338 339 340 341 342
/* Move clients from source to dest provided dest is running
 * and that the stream format is the same.
 * The only lock that should be held when this is called is the
 * source tree lock
 */
343 344
void source_move_clients (source_t *source, source_t *dest)
{
345
    unsigned long count = 0;
346 347 348 349 350
    if (strcmp (source->mount, dest->mount) == 0)
    {
        WARN1 ("src and dst are the same \"%s\", skipping", source->mount);
        return;
    }
351 352 353 354
    /* we don't want the two write locks to deadlock in here */
    thread_mutex_lock (&move_clients_mutex);

    /* if the destination is not running then we can't move clients */
355

356
    avl_tree_wlock (dest->pending_tree);
357
    if (dest->running == 0 && dest->on_demand == 0)
358
    {
359
        WARN1 ("destination mount %s not running, unable to move clients ", dest->mount);
360
        avl_tree_unlock (dest->pending_tree);
361
        thread_mutex_unlock (&move_clients_mutex);
362 363 364
        return;
    }

365
    do
366
    {
367
        client_t *client;
368

369 370
        /* we need to move the client and pending trees - we must take the
         * locks in this order to avoid deadlocks */
371
        avl_tree_wlock (source->pending_tree);
372
        avl_tree_wlock (source->client_tree);
373

374
        if (source->on_demand == 0 && source->format == NULL)
375 376
        {
            INFO1 ("source mount %s is not available", source->mount);
377
            break;
378
        }
379
        if (source->format && dest->format)
380
        {
381 382 383 384 385
            if (source->format->type != dest->format->type)
            {
                WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
                break;
            }
386
        }
387

388 389 390 391 392 393 394
        while (1)
        {
            avl_node *node = avl_get_first (source->pending_tree);
            if (node == NULL)
                break;
            client = (client_t *)(node->key);
            avl_delete (source->pending_tree, client, NULL);
395

396 397 398 399 400 401 402 403
            /* when switching a client to a different queue, be wary of the 
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
404 405
                if (source->con == NULL)
                    client->intro_offset = -1;
406 407
            }

408
            avl_insert (dest->pending_tree, (void *)client);
409
            count++;
410 411 412 413 414 415 416 417 418 419 420
        }

        while (1)
        {
            avl_node *node = avl_get_first (source->client_tree);
            if (node == NULL)
                break;

            client = (client_t *)(node->key);
            avl_delete (source->client_tree, client, NULL);

421 422 423 424 425 426 427 428
            /* when switching a client to a different queue, be wary of the 
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
429 430
                if (source->con == NULL)
                    client->intro_offset = -1;
431
            }
432
            avl_insert (dest->pending_tree, (void *)client);
433
            count++;
434
        }
435 436
        INFO2 ("passing %lu listeners to \"%s\"", count, dest->mount);

437 438 439 440 441
        source->listeners = 0;
        stats_event (source->mount, "listeners", "0");

    } while (0);

442 443 444
    avl_tree_unlock (source->pending_tree);
    avl_tree_unlock (source->client_tree);

445 446 447 448
    /* see if we need to wake up an on-demand relay */
    if (dest->running == 0 && dest->on_demand && count)
        dest->on_demand_req = 1;

449 450 451 452
    avl_tree_unlock (dest->pending_tree);
    thread_mutex_unlock (&move_clients_mutex);
}

453

Karl Heyes's avatar
Karl Heyes committed
454 455 456 457 458 459 460 461 462 463 464 465 466
/* get some data from the source. The stream data is placed in a refbuf
 * and sent back, however NULL is also valid as in the case of a short
 * timeout and there's no data pending.
 */
static refbuf_t *get_next_buffer (source_t *source)
{
    refbuf_t *refbuf = NULL;
    int delay = 250;

    if (source->short_delay)
        delay = 0;
    while (global.running == ICE_RUNNING && source->running)
    {
467
        int fds = 0;
Karl Heyes's avatar
Karl Heyes committed
468 469
        time_t current = time (NULL);

470
        if (source->client)
471 472 473 474 475 476
            fds = util_timed_wait_for_fd (source->con->sock, delay);
        else
        {
            thread_sleep (delay*1000);
            source->last_read = current;
        }
Karl Heyes's avatar
Karl Heyes committed
477

478 479 480
        if (current >= source->client_stats_update)
        {
            stats_event_args (source->mount, "total_bytes_read",
481
                    "%"PRIu64, source->format->read_bytes);
482
            stats_event_args (source->mount, "total_bytes_sent",
483
                    "%"PRIu64, source->format->sent_bytes);
484 485
            source->client_stats_update = current + 5;
        }
Karl Heyes's avatar
Karl Heyes committed
486 487 488 489 490 491 492 493 494 495 496
        if (fds < 0)
        {
            if (! sock_recoverable (sock_error()))
            {
                WARN0 ("Error while waiting on socket, Disconnecting source");
                source->running = 0;
            }
            break;
        }
        if (fds == 0)
        {
497 498
            thread_mutex_lock(&source->lock);
            if ((source->last_read + (time_t)source->timeout) < current)
Karl Heyes's avatar
Karl Heyes committed
499
            {
500 501
                DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
                        source->timeout, (long)current);
Karl Heyes's avatar
Karl Heyes committed
502 503 504
                WARN0 ("Disconnecting source due to socket timeout");
                source->running = 0;
            }
505
            thread_mutex_unlock(&source->lock);
Karl Heyes's avatar
Karl Heyes committed
506 507 508 509
            break;
        }
        source->last_read = current;
        refbuf = source->format->get_buffer (source);
510
        if (source->client->con && source->client->con->error)
511 512 513 514 515
        {
            INFO1 ("End of Stream %s", source->mount);
            source->running = 0;
            continue;
        }
Karl Heyes's avatar
Karl Heyes committed
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
        if (refbuf)
            break;
    }

    return refbuf;
}


/* general send routine per listener.  The deletion_expected tells us whether
 * the last in the queue is about to disappear, so if this client is still
 * referring to it after writing then drop the client as it's fallen too far
 * behind 
 */ 
static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
{
    int bytes;
    int loop = 10;   /* max number of iterations in one go */
    int total_written = 0;

    while (1)
    {
537 538 539 540 541 542 543 544
        /* check for limited listener time */
        if (client->con->discon_time)
            if (time(NULL) >= client->con->discon_time)
            {
                INFO1 ("time limit reached for client #%lu", client->con->id);
                client->con->error = 1;
            }

Karl Heyes's avatar
Karl Heyes committed
545 546 547 548 549 550 551 552
        /* jump out if client connection has died */
        if (client->con->error)
            break;

        /* lets not send too much to one client in one go, but don't
           sleep for too long if more data can be sent */
        if (total_written > 20000 || loop == 0)
        {
553 554
            if (client->check_buffer != format_check_file_buffer)
                source->short_delay = 1;
Karl Heyes's avatar
Karl Heyes committed
555 556 557 558 559
            break;
        }

        loop--;

Karl Heyes's avatar
Karl Heyes committed
560 561 562
        if (client->check_buffer (source, client) < 0)
            break;

563
        bytes = client->write_to_client (client);
Karl Heyes's avatar
Karl Heyes committed
564 565 566 567 568
        if (bytes <= 0)
            break;  /* can't write any more */

        total_written += bytes;
    }
569
    source->format->sent_bytes += total_written;
Karl Heyes's avatar
Karl Heyes committed
570 571 572

    /* the refbuf referenced at head (last in queue) may be marked for deletion
     * if so, check to see if this client is still referring to it */
Karl Heyes's avatar
Karl Heyes committed
573
    if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
Karl Heyes's avatar
Karl Heyes committed
574
    {
575 576 577
        INFO2 ("Client %lu (%s) has fallen too far behind, removing",
                client->con->id, client->con->ip);
        stats_event_inc (source->mount, "slow_listeners");
Karl Heyes's avatar
Karl Heyes committed
578 579 580 581
        client->con->error = 1;
    }
}

Jack Moffitt's avatar
Jack Moffitt committed
582

583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
/* Open the file for stream dumping.
 * This function should do all processing of the filename.
 */
static FILE * source_open_dumpfile(const char * filename) {
#ifndef _WIN32
    /* some of the below functions seems not to be standard winapi functions */
    char buffer[PATH_MAX];
    time_t curtime;
    struct tm *loctime;

    /* Get the current time. */
    curtime = time (NULL);

    /* Convert it to local time representation. */
    loctime = localtime (&curtime);

    strftime (buffer, sizeof(buffer), filename, loctime);
    filename = buffer;
#endif

    return fopen (filename, "ab");
}

Karl Heyes's avatar
Karl Heyes committed
606 607 608
/* Perform any initialisation just before the stream data is processed, the header
 * info is processed by now and the format details are setup
 */
609
static void source_init (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
610
{
611
    ice_config_t *config = config_get_config();
612 613
    char *listenurl;
    const char *str;
614
    int listen_url_size;
615
    mount_proxy *mountinfo;
616

617
    /* 6 for max size of port */
618
    listen_url_size = strlen("http://") + strlen(config->hostname) +
619
        strlen(":") + 6 + strlen(source->mount) + 1;
620 621 622

    listenurl = malloc (listen_url_size);
    memset (listenurl, '\000', listen_url_size);
623 624
    snprintf (listenurl, listen_url_size, "http://%s:%d%s",
            config->hostname, config->port, source->mount);
625 626
    config_release_config();

627 628 629 630 631 632 633 634
    str = httpp_getvar(source->parser, "ice-audio-info");
    source->audio_info = util_dict_new();
    if (str)
    {
        _parse_audio_info (source, str);
        stats_event (source->mount, "audio_info", str);
    }

635 636
    stats_event (source->mount, "listenurl", listenurl);

Michael Smith's avatar
Michael Smith committed
637
    free(listenurl);
638

639 640
    if (source->dumpfilename != NULL)
    {
641
        source->dumpfile = source_open_dumpfile (source->dumpfilename);
642 643 644 645 646 647 648
        if (source->dumpfile == NULL)
        {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    source->dumpfilename, strerror(errno));
        }
    }

649 650 651 652 653 654
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock (source->shutdown_rwlock);

    /* start off the statistics */
    source->listeners = 0;
    stats_event_inc (NULL, "source_total_connections");
655
    stats_event (source->mount, "slow_listeners", "0");
656
    stats_event_args (source->mount, "listeners", "%lu", source->listeners);
657
    stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
Karl Heyes's avatar
Karl Heyes committed
658
    stats_event_time (source->mount, "stream_start");
659

660
    DEBUG0("Source creation complete");
Karl Heyes's avatar
Karl Heyes committed
661
    source->last_read = time (NULL);
662
    source->prev_listeners = -1;
663
    source->running = 1;
664

665
    mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
666 667 668 669
    if (mountinfo)
    {
        if (mountinfo->on_connect)
            source_run_script (mountinfo->on_connect, source->mount);
670
        auth_stream_start (mountinfo, source->mount);
671
    }
672 673
    config_release_config();

Michael Smith's avatar
Michael Smith committed
674 675
    /*
    ** Now, if we have a fallback source and override is on, we want
676
    ** to steal its clients, because it means we've come back online
Michael Smith's avatar
Michael Smith committed
677 678 679 680
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

681 682 683 684
    if (source->fallback_override && source->fallback_mount)
    {
        source_t *fallback_source;

Michael Smith's avatar
Michael Smith committed
685 686 687
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);

688 689
        if (fallback_source)
            source_move_clients (fallback_source, source);
Michael Smith's avatar
Michael Smith committed
690

691
        avl_tree_unlock(global.source_tree);
Michael Smith's avatar
Michael Smith committed
692
    }
693 694 695 696 697
}


void source_main (source_t *source)
{
Karl Heyes's avatar
Karl Heyes committed
698
    refbuf_t *refbuf;
699 700 701 702
    client_t *client;
    avl_node *client_node;

    source_init (source);
Michael Smith's avatar
Michael Smith committed
703

704
    while (global.running == ICE_RUNNING && source->running) {
Karl Heyes's avatar
Karl Heyes committed
705
        int remove_from_q;
Jack Moffitt's avatar
Jack Moffitt committed
706

Karl Heyes's avatar
Karl Heyes committed
707
        refbuf = get_next_buffer (source);
708

Karl Heyes's avatar
Karl Heyes committed
709 710
        remove_from_q = 0;
        source->short_delay = 0;
711

Karl Heyes's avatar
Karl Heyes committed
712 713 714 715
        if (refbuf)
        {
            /* append buffer to the in-flight data queue,  */
            if (source->stream_data == NULL)
Michael Smith's avatar
Michael Smith committed
716
            {
Karl Heyes's avatar
Karl Heyes committed
717 718
                source->stream_data = refbuf;
                source->burst_point = refbuf;
719
            }
Karl Heyes's avatar
Karl Heyes committed
720 721 722 723 724 725 726 727 728
            if (source->stream_data_tail)
                source->stream_data_tail->next = refbuf;
            source->stream_data_tail = refbuf;
            source->queue_size += refbuf->len;
            /* new buffer is referenced for burst */
            refbuf_addref (refbuf);

            /* new data on queue, so check the burst point */
            source->burst_offset += refbuf->len;
729
            while (source->burst_offset > source->burst_size)
Karl Heyes's avatar
Karl Heyes committed
730
            {
731 732 733
                refbuf_t *to_release = source->burst_point;

                if (to_release->next)
Karl Heyes's avatar
Karl Heyes committed
734
                {
735 736 737 738
                    source->burst_point = to_release->next;
                    source->burst_offset -= to_release->len;
                    refbuf_release (to_release);
                    continue;
739
                }
740
                break;
741 742
            }

Karl Heyes's avatar
Karl Heyes committed
743 744 745
            /* save stream to file */
            if (source->dumpfile && source->format->write_buf_to_file)
                source->format->write_buf_to_file (source, refbuf);
746
        }
Karl Heyes's avatar
Karl Heyes committed
747
        /* lets see if we have too much data in the queue, but don't remove it until later */
748
        thread_mutex_lock(&source->lock);
Karl Heyes's avatar
Karl Heyes committed
749 750
        if (source->queue_size > source->queue_size_limit)
            remove_from_q = 1;
751
        thread_mutex_unlock(&source->lock);
752

753 754 755
        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

756 757 758 759 760 761
        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
Karl Heyes's avatar
Karl Heyes committed
762 763 764

            send_to_listener (source, client, remove_from_q);

765 766
            if (client->con->error) {
                client_node = avl_get_next(client_node);
767 768
                if (client->respcode == 200)
                    stats_event_dec (NULL, "listeners");
769
                avl_delete(source->client_tree, (void *)client, _free_client);
Michael Smith's avatar
Michael Smith committed
770
                source->listeners--;
771
                DEBUG0("Client removed");
772 773 774 775 776 777 778 779
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
780

Michael Smith's avatar
Michael Smith committed
781
            if(source->max_listeners != -1 && 
782
                    source->listeners >= (unsigned long)source->max_listeners) 
Michael Smith's avatar
Michael Smith committed
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

                INFO0("Client deleted, exceeding maximum listeners for this "
                        "mountpoint.");
                continue;
            }
            
            /* Otherwise, the client is accepted, add it */
799
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
800 801

            source->listeners++;
802
            DEBUG0("Client added");
803 804 805 806 807 808 809
            stats_event_inc(source->mount, "connections");

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
810 811 812
            avl_delete(source->pending_tree, 
                    avl_get_first(source->pending_tree)->key, 
                    source_remove_client);
813 814 815 816 817
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

Karl Heyes's avatar
Karl Heyes committed
818
        /* update the stats if need be */
819
        if (source->listeners != source->prev_listeners)
Karl Heyes's avatar
Karl Heyes committed
820
        {
821
            source->prev_listeners = source->listeners;
822
            INFO2("listener count on %s now %lu", source->mount, source->listeners);
Karl Heyes's avatar
Karl Heyes committed
823 824 825 826 827
            if (source->listeners > source->peak_listeners)
            {
                source->peak_listeners = source->listeners;
                stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
            }
828
            stats_event_args (source->mount, "listeners", "%lu", source->listeners);
829 830
            if (source->listeners == 0 && source->on_demand)
                source->running = 0;
Karl Heyes's avatar
Karl Heyes committed
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853
        }

        /* lets reduce the queue, any lagging clients should of been
         * terminated by now
         */
        if (source->stream_data)
        {
            /* normal unreferenced queue data will have a refcount 1, but
             * burst queue data will be at least 2, active clients will also
             * increase refcount */
            while (source->stream_data->_count == 1)
            {
                refbuf_t *to_go = source->stream_data;

                if (to_go->next == NULL || source->burst_point == to_go)
                {
                    /* this should not happen */
                    ERROR0 ("queue state is unexpected");
                    source->running = 0;
                    break;
                }
                source->stream_data = to_go->next;
                source->queue_size -= to_go->len;
854
                to_go->next = NULL;
Karl Heyes's avatar
Karl Heyes committed
855 856 857 858
                refbuf_release (to_go);
            }
        }

859 860 861
        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Karl Heyes's avatar
Karl Heyes committed
862 863
    source_shutdown (source);
}
Jack Moffitt's avatar
Jack Moffitt committed
864

Michael Smith's avatar
Michael Smith committed
865

Karl Heyes's avatar
Karl Heyes committed
866 867
static void source_shutdown (source_t *source)
{
868 869
    mount_proxy *mountinfo;

870
    source->running = 0;
871
    INFO1("Source \"%s\" exiting", source->mount);
872

873
    mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
874 875 876 877
    if (mountinfo)
    {
        if (mountinfo->on_disconnect)
            source_run_script (mountinfo->on_disconnect, source->mount);
878
        auth_stream_end (mountinfo, source->mount);
879
    }
880 881
    config_release_config();

882 883
    /* we have de-activated the source now, so no more clients will be
     * added, now move the listeners we have to the fallback (if any)
884
     */
885 886 887
    if (source->fallback_mount)
    {
        source_t *fallback_source;
Michael Smith's avatar
Michael Smith committed
888

889 890
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (source->fallback_mount);
Michael Smith's avatar
Michael Smith committed
891

892 893 894 895 896
        if (fallback_source != NULL)
            source_move_clients (source, fallback_source);

        avl_tree_unlock (global.source_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
897

898
    /* delete this sources stats */
899
    stats_event(source->mount, NULL, NULL);
Jack Moffitt's avatar
Jack Moffitt committed
900

901 902 903 904
    /* we don't remove the source from the tree here, it may be a relay and
       therefore reserved */
    source_clear_source (source);

905 906
    global_lock();
    global.sources--;
Karl Heyes's avatar
Karl Heyes committed
907
    stats_event_args (NULL, "sources", "%d", global.sources);
908
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
909

910 911
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
912 913
}

914

Jack Moffitt's avatar
Jack Moffitt committed
915 916
static int _compare_clients(void *compare_arg, void *a, void *b)
{
917 918 919 920 921
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
922

923 924
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
925

926
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
927 928
}

929
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
930
{
931
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
932 933 934 935
}

static int _free_client(void *key)
{
936 937
    client_t *client = (client_t *)key;

938 939 940 941 942
    /* if no response has been sent then send a 404 */
    if (client->respcode == 0)
        client_send_404 (client, "Mount unavailable");
    else
        client_destroy(client);
943 944
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
945
}
946

947
static void _parse_audio_info (source_t *source, const char *s)
948
{
949
    const char *start = s;
Karl Heyes's avatar
Karl Heyes committed
950
    unsigned int len;
951 952 953 954 955 956 957 958 959 960 961 962

    while (start != NULL && *start != '\0')
    {
        if ((s = strchr (start, ';')) == NULL)
            len = strlen (start);
        else
        {
            len = (int)(s - start);
            s++; /* skip passed the ';' */
        }
        if (len)
        {
963
            char name[100], value[200];
964 965
            char *esc;

966
            sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
967 968 969 970
            esc = util_url_unescape (value);
            if (esc)
            {
                util_dict_set (source->audio_info, name, esc);
971
                stats_event (source->mount, name, esc);
972
                free (esc);
973 974
            }
        }
975
        start = s;
976 977
    }
}
978 979


980
/* Apply the mountinfo details to the source */
981
static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
982
{
983
    const char *str;
984 985 986
    int val;
    http_parser_t *parser = NULL;

987
    DEBUG1("Applying mount information for \"%s\"", source->mount);
988
    avl_tree_rlock (source->client_tree);
989 990
    stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);

991 992 993 994 995 996 997 998 999 1000 1001 1002 1003
    if (mountinfo)
    {
        source->max_listeners = mountinfo->max_listeners;
        source->fallback_override = mountinfo->fallback_override;
        source->hidden = mountinfo->hidden;
    }

    /* if a setting is available in the mount details then use it, else
     * check the parser details. */

    if (source->client)
        parser = source->client->parser;

1004 1005 1006 1007
    /* to be done before possible non-utf8 stats */
    if (source->format && source->format->apply_settings)
        source->format->apply_settings (source->client, source->format, mountinfo);

1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
    /* public */
    if (mountinfo && mountinfo->yp_public >= 0)
        val = mountinfo->yp_public;
    else
    {
        do {
            str = httpp_getvar (parser, "ice-public");
            if (str) break;
            str = httpp_getvar (parser, "icy-pub");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-public");
            if (str) break;
            /* handle header from icecast v2 release */
            str = httpp_getvar (parser, "icy-public");
            if (str) break;
            str = "0";
        } while (0);
        val = atoi (str);
    }
    stats_event_args (source->mount, "public", "%d", val);
    if (source->yp_public != val)
    {
        DEBUG1 ("YP changed to %d", val);
        if (val)
            yp_add (source->mount);
        else
            yp_remove (source->mount);
        source->yp_public = val;
    }

    /* stream name */
    if (mountinfo && mountinfo->stream_name)
1040
        stats_event (source->mount, "server_name", mountinfo->stream_name);
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
    else
    {
        do {
            str = httpp_getvar (parser, "ice-name");
            if (str) break;
            str = httpp_getvar (parser, "icy-name");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-name");
            if (str) break;
            str = "Unspecified name";
        } while (0);
1052 1053
        if (source->format)
            stats_event_conv (source->mount, "server_name", str, source->format->charset);
1054 1055 1056 1057
    }

    /* stream description */
    if (mountinfo && mountinfo->stream_description)
1058
        stats_event (source->mount, "server_description", mountinfo->stream_description);