source.c 39.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14
15
16
17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
18
19
20
21
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
22
#include <ogg/ogg.h>
Michael Smith's avatar
Michael Smith committed
23
#include <errno.h>
24
25

#ifndef _WIN32
26
#include <unistd.h>
Jack Moffitt's avatar
Jack Moffitt committed
27
#include <sys/time.h>
28
#include <sys/socket.h>
29
#include <sys/wait.h>
30
#else
31
32
#include <winsock2.h>
#include <windows.h>
33
#define snprintf _snprintf
34
#endif
Jack Moffitt's avatar
Jack Moffitt committed
35

Karl Heyes's avatar
Karl Heyes committed
36
37
38
39
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
40
41
42
43
44
45

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
46
#include "logging.h"
47
#include "cfgfile.h"
48
#include "util.h"
Jack Moffitt's avatar
Jack Moffitt committed
49
#include "source.h"
Michael Smith's avatar
Michael Smith committed
50
#include "format.h"
51
#include "fserve.h"
Michael Smith's avatar
Michael Smith committed
52
#include "auth.h"
Karl Heyes's avatar
Karl Heyes committed
53
#include "os.h"
Jack Moffitt's avatar
Jack Moffitt committed
54

55
56
57
#undef CATMODULE
#define CATMODULE "source"

Michael Smith's avatar
Michael Smith committed
58
59
#define MAX_FALLBACK_DEPTH 10

60
61
mutex_t move_clients_mutex;

Jack Moffitt's avatar
Jack Moffitt committed
62
63
64
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
65
static void _parse_audio_info (source_t *source, const char *s);
Karl Heyes's avatar
Karl Heyes committed
66
static void source_shutdown (source_t *source);
67
68
69
70
71
#ifdef _WIN32
#define source_run_script(x,y)  WARN0("on [dis]connect scripts disabled");
#else
static void source_run_script (char *command, char *mountpoint);
#endif
Jack Moffitt's avatar
Jack Moffitt committed
72

73
74
75
76
77
78
79
80
/* Allocate a new source with the stated mountpoint, if one already
 * exists with that mountpoint in the global source tree then return
 * NULL.
 */
source_t *source_reserve (const char *mount)
{
    source_t *src = NULL;

81
82
83
84
    if(mount[0] != '/')
        WARN1("Source at \"%s\" does not start with '/', clients will be "
                "unable to connect", mount);

85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
    do
    {
        avl_tree_wlock (global.source_tree);
        src = source_find_mount_raw (mount);
        if (src)
        {
            src = NULL;
            break;
        }

        src = calloc (1, sizeof(source_t));
        if (src == NULL)
            break;

        src->client_tree = avl_tree_new(_compare_clients, NULL);
        src->pending_tree = avl_tree_new(_compare_clients, NULL);

        /* make duplicates for strings or similar */
        src->mount = strdup (mount);
        src->max_listeners = -1;

        avl_insert (global.source_tree, src);

    } while (0);

    avl_tree_unlock (global.source_tree);
    return src;
}


Michael Smith's avatar
Michael Smith committed
115
116
117
118
/* Find a mount with this raw name - ignoring fallbacks. You should have the
 * global source tree locked to call this.
 */
source_t *source_find_mount_raw(const char *mount)
Jack Moffitt's avatar
Jack Moffitt committed
119
{
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
    source_t *source;
    avl_node *node;
    int cmp;

    if (!mount) {
        return NULL;
    }
    /* get the root node */
    node = global.source_tree->root->right;
    
    while (node) {
        source = (source_t *)node->key;
        cmp = strcmp(mount, source->mount);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return source;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
143
144
}

145
146

/* Search for mount, if the mount is there but not currently running then
Karl Heyes's avatar
Karl Heyes committed
147
 * check the fallback, and so on.  Must have a global source lock to call
148
149
150
 * this function.
 */
source_t *source_find_mount (const char *mount)
Michael Smith's avatar
Michael Smith committed
151
{
152
    source_t *source = NULL;
Michael Smith's avatar
Michael Smith committed
153
    ice_config_t *config;
154
155
156
157
    mount_proxy *mountinfo;
    int depth = 0;

    config = config_get_config();
158
    while (mount && depth < MAX_FALLBACK_DEPTH)
159
160
    {
        source = source_find_mount_raw(mount);
Michael Smith's avatar
Michael Smith committed
161

162
163
164
165
166
        if (source)
        {
            if (source->running || source->on_demand)
                break;
        }
Michael Smith's avatar
Michael Smith committed
167

168
169
170
171
        /* we either have a source which is not active (relay) or no source
         * at all. Check the mounts list for fallback settings
         */
        mountinfo = config_find_mount (config, mount);
172
        source = NULL;
173
174
175
176

        if (mountinfo == NULL)
            break;
        mount = mountinfo->fallback_mount;
177
        depth++;
Michael Smith's avatar
Michael Smith committed
178
179
    }

180
    config_release_config();
Michael Smith's avatar
Michael Smith committed
181
182
183
184
    return source;
}


Jack Moffitt's avatar
Jack Moffitt committed
185
186
int source_compare_sources(void *arg, void *a, void *b)
{
187
188
    source_t *srca = (source_t *)a;
    source_t *srcb = (source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
189

190
    return strcmp(srca->mount, srcb->mount);
Jack Moffitt's avatar
Jack Moffitt committed
191
192
}

193
194
195
196
197
198

void source_clear_source (source_t *source)
{
    DEBUG1 ("clearing source \"%s\"", source->mount);
    client_destroy(source->client);
    source->client = NULL;
199
200
    source->parser = NULL;
    source->con = NULL;
201

202
203
204
205
206
207
208
    if (source->dumpfile)
    {
        INFO1 ("Closing dumpfile for %s", source->mount);
        fclose (source->dumpfile);
        source->dumpfile = NULL;
    }

209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
    /* lets kick off any clients that are left on here */
    avl_tree_rlock (source->client_tree);
    while (avl_get_first (source->client_tree))
    {
        avl_delete (source->client_tree,
                avl_get_first (source->client_tree)->key, _free_client);
    }
    avl_tree_unlock (source->client_tree);

    avl_tree_rlock (source->pending_tree);
    while (avl_get_first (source->pending_tree))
    {
        avl_delete (source->pending_tree,
                avl_get_first(source->pending_tree)->key, _free_client);
    }
    avl_tree_unlock (source->pending_tree);

    if (source->format && source->format->free_plugin)
        source->format->free_plugin (source->format);
    source->format = NULL;
229
230
231
232
233
234
235
236
237
238
239
240
241

    /* Lets clear out the source queue too */
    while (source->stream_data)
    {
        refbuf_t *p = source->stream_data;
        source->stream_data = p->next;
        /* can be referenced by burst handler as well */
        while (p->_count > 1)
            refbuf_release (p);
        refbuf_release (p);
    }
    source->stream_data_tail = NULL;

Karl Heyes's avatar
Karl Heyes committed
242
243
244
245
    source->burst_point = NULL;
    source->burst_size = 0;
    source->burst_offset = 0;
    source->queue_size = 0;
246
    source->queue_size_limit = 0;
247
    source->listeners = 0;
248
    source->shoutcast_compat = 0;
249
    source->max_listeners = -1;
250
    source->hidden = 0;
251
    source->client_stats_update = 0;
252
253
    util_dict_free (source->audio_info);
    source->audio_info = NULL;
254
255
256
257
258
259

    free(source->fallback_mount);
    source->fallback_mount = NULL;

    free(source->dumpfilename);
    source->dumpfilename = NULL;
Karl Heyes's avatar
Karl Heyes committed
260
261
262
263
264
265

    if (source->intro_file)
    {
        fclose (source->intro_file);
        source->intro_file = NULL;
    }
266
267

    source->on_demand_req = 0;
268
269
270
}


271
/* Remove the provided source from the global tree and free it */
272
void source_free_source (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
273
{
274
275
276
277
278
    DEBUG1 ("freeing source \"%s\"", source->mount);
    avl_tree_wlock (global.source_tree);
    avl_delete (global.source_tree, source, NULL);
    avl_tree_unlock (global.source_tree);

279
280
    avl_tree_free(source->pending_tree, _free_client);
    avl_tree_free(source->client_tree, _free_client);
281

282
283
284
    /* make sure all YP entries have gone */
    yp_remove (source->mount);

285
286
    free (source->mount);
    free (source);
Jack Moffitt's avatar
Jack Moffitt committed
287

288
    return;
Jack Moffitt's avatar
Jack Moffitt committed
289
}
290

291

292
293
294
client_t *source_find_client(source_t *source, int id)
{
    client_t fakeclient;
295
    void *result;
296
297
298
299
300
301
    connection_t fakecon;

    fakeclient.con = &fakecon;
    fakeclient.con->id = id;

    avl_tree_rlock(source->client_tree);
302
    if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
303
304
305
306
307
308
309
310
    {
        avl_tree_unlock(source->client_tree);
        return result;
    }

    avl_tree_unlock(source->client_tree);
    return NULL;
}
311

312

313
314
315
316
317
/* Move clients from source to dest provided dest is running
 * and that the stream format is the same.
 * The only lock that should be held when this is called is the
 * source tree lock
 */
318
319
void source_move_clients (source_t *source, source_t *dest)
{
320
    unsigned long count = 0;
321
322
323
324
325
    if (strcmp (source->mount, dest->mount) == 0)
    {
        WARN1 ("src and dst are the same \"%s\", skipping", source->mount);
        return;
    }
326
327
328
329
    /* we don't want the two write locks to deadlock in here */
    thread_mutex_lock (&move_clients_mutex);

    /* if the destination is not running then we can't move clients */
330

331
    if (dest->running == 0 && dest->on_demand == 0)
332
    {
333
334
        WARN1 ("destination mount %s not running, unable to move clients ", dest->mount);
        thread_mutex_unlock (&move_clients_mutex);
335
336
337
338
        return;
    }

    avl_tree_wlock (dest->pending_tree);
339
    do
340
    {
341
        client_t *client;
342

343
344
        /* we need to move the client and pending trees */
        avl_tree_wlock (source->pending_tree);
345

346
        if (source->on_demand == 0 && source->format == NULL)
347
348
        {
            INFO1 ("source mount %s is not available", source->mount);
349
            break;
350
        }
351
        if (source->format && dest->format)
352
        {
353
354
355
356
357
            if (source->format->type != dest->format->type)
            {
                WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
                break;
            }
358
        }
359

360
361
362
363
364
365
366
        while (1)
        {
            avl_node *node = avl_get_first (source->pending_tree);
            if (node == NULL)
                break;
            client = (client_t *)(node->key);
            avl_delete (source->pending_tree, client, NULL);
367

368
369
370
371
372
373
374
375
            /* when switching a client to a different queue, be wary of the 
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
376
377
                if (source->con == NULL)
                    client->intro_offset = -1;
378
379
            }

380
            avl_insert (dest->pending_tree, (void *)client);
381
            count++;
382
383
384
385
386
387
388
389
390
391
392
393
        }

        avl_tree_wlock (source->client_tree);
        while (1)
        {
            avl_node *node = avl_get_first (source->client_tree);
            if (node == NULL)
                break;

            client = (client_t *)(node->key);
            avl_delete (source->client_tree, client, NULL);

394
395
396
397
398
399
400
401
            /* when switching a client to a different queue, be wary of the 
             * refbuf it's referring to, if it's http headers then we need
             * to write them so don't release it.
             */
            if (client->check_buffer != format_check_http_buffer)
            {
                client_set_queue (client, NULL);
                client->check_buffer = format_check_file_buffer;
402
403
                if (source->con == NULL)
                    client->intro_offset = -1;
404
            }
405
            avl_insert (dest->pending_tree, (void *)client);
406
            count++;
407
        }
408
409
        INFO2 ("passing %lu listeners to \"%s\"", count, dest->mount);

410
411
412
413
414
415
        source->listeners = 0;
        stats_event (source->mount, "listeners", "0");
        avl_tree_unlock (source->client_tree);

    } while (0);

416
417
418
419
420
421
422
    /* see if we need to wake up an on-demand relay */
    if (dest->running == 0 && dest->on_demand && count)
    {
        dest->on_demand_req = 1;
        slave_rebuild_mounts();
    }

423
    avl_tree_unlock (source->pending_tree);
424
425
426
427
    avl_tree_unlock (dest->pending_tree);
    thread_mutex_unlock (&move_clients_mutex);
}

428

Karl Heyes's avatar
Karl Heyes committed
429
430
431
432
433
434
435
436
437
438
439
440
441
/* get some data from the source. The stream data is placed in a refbuf
 * and sent back, however NULL is also valid as in the case of a short
 * timeout and there's no data pending.
 */
static refbuf_t *get_next_buffer (source_t *source)
{
    refbuf_t *refbuf = NULL;
    int delay = 250;

    if (source->short_delay)
        delay = 0;
    while (global.running == ICE_RUNNING && source->running)
    {
442
        int fds = 0;
Karl Heyes's avatar
Karl Heyes committed
443
444
        time_t current = time (NULL);

445
        if (source->client)
446
447
448
449
450
451
            fds = util_timed_wait_for_fd (source->con->sock, delay);
        else
        {
            thread_sleep (delay*1000);
            source->last_read = current;
        }
Karl Heyes's avatar
Karl Heyes committed
452

453
454
455
456
457
458
459
460
        if (current >= source->client_stats_update)
        {
            stats_event_args (source->mount, "total_bytes_read",
                    FORMAT_UINT64, source->format->read_bytes);
            stats_event_args (source->mount, "total_bytes_sent",
                    FORMAT_UINT64, source->format->sent_bytes);
            source->client_stats_update = current + 5;
        }
Karl Heyes's avatar
Karl Heyes committed
461
462
463
464
465
466
467
468
469
470
471
472
473
        if (fds < 0)
        {
            if (! sock_recoverable (sock_error()))
            {
                WARN0 ("Error while waiting on socket, Disconnecting source");
                source->running = 0;
            }
            break;
        }
        if (fds == 0)
        {
            if (source->last_read + (time_t)source->timeout < current)
            {
474
475
                DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
                        source->timeout, (long)current);
Karl Heyes's avatar
Karl Heyes committed
476
477
478
479
480
481
482
                WARN0 ("Disconnecting source due to socket timeout");
                source->running = 0;
            }
            break;
        }
        source->last_read = current;
        refbuf = source->format->get_buffer (source);
483
        if (source->client->con && source->client->con->error)
484
485
486
487
488
        {
            INFO1 ("End of Stream %s", source->mount);
            source->running = 0;
            continue;
        }
Karl Heyes's avatar
Karl Heyes committed
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
        if (refbuf)
            break;
    }

    return refbuf;
}


/* general send routine per listener.  The deletion_expected tells us whether
 * the last in the queue is about to disappear, so if this client is still
 * referring to it after writing then drop the client as it's fallen too far
 * behind 
 */ 
static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
{
    int bytes;
    int loop = 10;   /* max number of iterations in one go */
    int total_written = 0;

    while (1)
    {
510
511
512
513
514
515
516
517
        /* check for limited listener time */
        if (client->con->discon_time)
            if (time(NULL) >= client->con->discon_time)
            {
                INFO1 ("time limit reached for client #%lu", client->con->id);
                client->con->error = 1;
            }

Karl Heyes's avatar
Karl Heyes committed
518
519
520
521
522
523
524
525
        /* jump out if client connection has died */
        if (client->con->error)
            break;

        /* lets not send too much to one client in one go, but don't
           sleep for too long if more data can be sent */
        if (total_written > 20000 || loop == 0)
        {
526
527
            if (client->check_buffer != format_check_file_buffer)
                source->short_delay = 1;
Karl Heyes's avatar
Karl Heyes committed
528
529
530
531
532
            break;
        }

        loop--;

Karl Heyes's avatar
Karl Heyes committed
533
534
535
        if (client->check_buffer (source, client) < 0)
            break;

536
        bytes = client->write_to_client (client);
Karl Heyes's avatar
Karl Heyes committed
537
538
539
540
541
        if (bytes <= 0)
            break;  /* can't write any more */

        total_written += bytes;
    }
542
    source->format->sent_bytes += total_written;
Karl Heyes's avatar
Karl Heyes committed
543
544
545

    /* the refbuf referenced at head (last in queue) may be marked for deletion
     * if so, check to see if this client is still referring to it */
Karl Heyes's avatar
Karl Heyes committed
546
    if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
Karl Heyes's avatar
Karl Heyes committed
547
    {
548
549
550
        INFO2 ("Client %lu (%s) has fallen too far behind, removing",
                client->con->id, client->con->ip);
        stats_event_inc (source->mount, "slow_listeners");
Karl Heyes's avatar
Karl Heyes committed
551
552
553
554
        client->con->error = 1;
    }
}

Jack Moffitt's avatar
Jack Moffitt committed
555

Karl Heyes's avatar
Karl Heyes committed
556
557
558
/* Perform any initialisation just before the stream data is processed, the header
 * info is processed by now and the format details are setup
 */
559
static void source_init (source_t *source)
Jack Moffitt's avatar
Jack Moffitt committed
560
{
561
    ice_config_t *config = config_get_config();
562
    char *listenurl, *str;
563
    int listen_url_size;
564
    mount_proxy *mountinfo;
565

566
    /* 6 for max size of port */
567
    listen_url_size = strlen("http://") + strlen(config->hostname) +
568
        strlen(":") + 6 + strlen(source->mount) + 1;
569
570
571

    listenurl = malloc (listen_url_size);
    memset (listenurl, '\000', listen_url_size);
572
573
    snprintf (listenurl, listen_url_size, "http://%s:%d%s",
            config->hostname, config->port, source->mount);
574
575
    config_release_config();

576
577
578
579
580
581
582
583
    str = httpp_getvar(source->parser, "ice-audio-info");
    source->audio_info = util_dict_new();
    if (str)
    {
        _parse_audio_info (source, str);
        stats_event (source->mount, "audio_info", str);
    }

584
585
    stats_event (source->mount, "listenurl", listenurl);

586
587
588
    if (listenurl) {
        free(listenurl);
    }
589

Karl Heyes's avatar
Karl Heyes committed
590
591
    stats_event_args (source->mount, "listener_peak", "0");

592
593
594
595
596
597
598
599
600
601
    if (source->dumpfilename != NULL)
    {
        source->dumpfile = fopen (source->dumpfilename, "ab");
        if (source->dumpfile == NULL)
        {
            WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
                    source->dumpfilename, strerror(errno));
        }
    }

602
603
604
605
606
607
    /* grab a read lock, to make sure we get a chance to cleanup */
    thread_rwlock_rlock (source->shutdown_rwlock);

    /* start off the statistics */
    source->listeners = 0;
    stats_event_inc (NULL, "source_total_connections");
608
    stats_event (source->mount, "slow_listeners", "0");
609
    stats_event_args (source->mount, "listeners", "%lu", source->listeners);
Karl Heyes's avatar
Karl Heyes committed
610
611
    stats_event (source->mount, "listener_peak", "0");
    stats_event_time (source->mount, "stream_start");
612

613
    DEBUG0("Source creation complete");
Karl Heyes's avatar
Karl Heyes committed
614
    source->last_read = time (NULL);
615
    source->running = 1;
616

617
    mountinfo = config_find_mount (config_get_config(), source->mount);
618
619
620
621
    if (mountinfo)
    {
        if (mountinfo->on_connect)
            source_run_script (mountinfo->on_connect, source->mount);
622
        auth_stream_start (mountinfo, source->mount);
623
    }
624
625
    config_release_config();

Michael Smith's avatar
Michael Smith committed
626
627
    /*
    ** Now, if we have a fallback source and override is on, we want
628
    ** to steal its clients, because it means we've come back online
Michael Smith's avatar
Michael Smith committed
629
630
631
632
    ** after a failure and they should be gotten back from the waiting
    ** loop or jingle track or whatever the fallback is used for
    */

633
634
635
636
    if (source->fallback_override && source->fallback_mount)
    {
        source_t *fallback_source;

Michael Smith's avatar
Michael Smith committed
637
638
639
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount(source->fallback_mount);

640
641
        if (fallback_source)
            source_move_clients (fallback_source, source);
Michael Smith's avatar
Michael Smith committed
642

643
        avl_tree_unlock(global.source_tree);
Michael Smith's avatar
Michael Smith committed
644
    }
645
646
647
648
649
}


void source_main (source_t *source)
{
Karl Heyes's avatar
Karl Heyes committed
650
651
    unsigned int listeners;
    refbuf_t *refbuf;
652
653
654
655
    client_t *client;
    avl_node *client_node;

    source_init (source);
Michael Smith's avatar
Michael Smith committed
656

657
    while (global.running == ICE_RUNNING && source->running) {
Karl Heyes's avatar
Karl Heyes committed
658
        int remove_from_q;
Jack Moffitt's avatar
Jack Moffitt committed
659

Karl Heyes's avatar
Karl Heyes committed
660
        refbuf = get_next_buffer (source);
661

Karl Heyes's avatar
Karl Heyes committed
662
663
        remove_from_q = 0;
        source->short_delay = 0;
664

Karl Heyes's avatar
Karl Heyes committed
665
666
667
668
        if (refbuf)
        {
            /* append buffer to the in-flight data queue,  */
            if (source->stream_data == NULL)
Michael Smith's avatar
Michael Smith committed
669
            {
Karl Heyes's avatar
Karl Heyes committed
670
671
                source->stream_data = refbuf;
                source->burst_point = refbuf;
672
            }
Karl Heyes's avatar
Karl Heyes committed
673
674
675
676
677
678
679
680
681
            if (source->stream_data_tail)
                source->stream_data_tail->next = refbuf;
            source->stream_data_tail = refbuf;
            source->queue_size += refbuf->len;
            /* new buffer is referenced for burst */
            refbuf_addref (refbuf);

            /* new data on queue, so check the burst point */
            source->burst_offset += refbuf->len;
682
            while (source->burst_offset > source->burst_size)
Karl Heyes's avatar
Karl Heyes committed
683
            {
684
685
686
                refbuf_t *to_release = source->burst_point;

                if (to_release->next)
Karl Heyes's avatar
Karl Heyes committed
687
                {
688
689
690
691
                    source->burst_point = to_release->next;
                    source->burst_offset -= to_release->len;
                    refbuf_release (to_release);
                    continue;
692
                }
693
                break;
694
695
            }

Karl Heyes's avatar
Karl Heyes committed
696
697
698
            /* save stream to file */
            if (source->dumpfile && source->format->write_buf_to_file)
                source->format->write_buf_to_file (source, refbuf);
699
        }
Karl Heyes's avatar
Karl Heyes committed
700
701
702
        /* lets see if we have too much data in the queue, but don't remove it until later */
        if (source->queue_size > source->queue_size_limit)
            remove_from_q = 1;
703
704
705
706

        /* acquire write lock on client_tree */
        avl_tree_wlock(source->client_tree);

Karl Heyes's avatar
Karl Heyes committed
707
        listeners = source->listeners;
708
709
710
        client_node = avl_get_first(source->client_tree);
        while (client_node) {
            client = (client_t *)client_node->key;
Karl Heyes's avatar
Karl Heyes committed
711
712
713

            send_to_listener (source, client, remove_from_q);

714
715
716
            if (client->con->error) {
                client_node = avl_get_next(client_node);
                avl_delete(source->client_tree, (void *)client, _free_client);
Michael Smith's avatar
Michael Smith committed
717
                source->listeners--;
718
                DEBUG0("Client removed");
719
720
721
722
723
724
725
726
727
728
729
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        /* acquire write lock on pending_tree */
        avl_tree_wlock(source->pending_tree);

        /** add pending clients **/
        client_node = avl_get_first(source->pending_tree);
        while (client_node) {
730

Michael Smith's avatar
Michael Smith committed
731
            if(source->max_listeners != -1 && 
732
                    source->listeners >= (unsigned long)source->max_listeners) 
Michael Smith's avatar
Michael Smith committed
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
            {
                /* The common case is caught in the main connection handler,
                 * this deals with rarer cases (mostly concerning fallbacks)
                 * and doesn't give the listening client any information about
                 * why they were disconnected
                 */
                client = (client_t *)client_node->key;
                client_node = avl_get_next(client_node);
                avl_delete(source->pending_tree, (void *)client, _free_client);

                INFO0("Client deleted, exceeding maximum listeners for this "
                        "mountpoint.");
                continue;
            }
            
            /* Otherwise, the client is accepted, add it */
749
            avl_insert(source->client_tree, client_node->key);
Michael Smith's avatar
Michael Smith committed
750
751

            source->listeners++;
752
            DEBUG0("Client added");
753
754
755
756
757
758
759
            stats_event_inc(source->mount, "connections");

            client_node = avl_get_next(client_node);
        }

        /** clear pending tree **/
        while (avl_get_first(source->pending_tree)) {
Michael Smith's avatar
Michael Smith committed
760
761
762
            avl_delete(source->pending_tree, 
                    avl_get_first(source->pending_tree)->key, 
                    source_remove_client);
763
764
765
766
767
        }

        /* release write lock on pending_tree */
        avl_tree_unlock(source->pending_tree);

Karl Heyes's avatar
Karl Heyes committed
768
769
770
        /* update the stats if need be */
        if (source->listeners != listeners)
        {
771
            INFO2("listener count on %s now %lu", source->mount, source->listeners);
Karl Heyes's avatar
Karl Heyes committed
772
773
774
775
776
            if (source->listeners > source->peak_listeners)
            {
                source->peak_listeners = source->listeners;
                stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
            }
777
            stats_event_args (source->mount, "listeners", "%lu", source->listeners);
778
779
            if (source->listeners == 0 && source->on_demand)
                source->running = 0;
Karl Heyes's avatar
Karl Heyes committed
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
        }

        /* lets reduce the queue, any lagging clients should of been
         * terminated by now
         */
        if (source->stream_data)
        {
            /* normal unreferenced queue data will have a refcount 1, but
             * burst queue data will be at least 2, active clients will also
             * increase refcount */
            while (source->stream_data->_count == 1)
            {
                refbuf_t *to_go = source->stream_data;

                if (to_go->next == NULL || source->burst_point == to_go)
                {
                    /* this should not happen */
                    ERROR0 ("queue state is unexpected");
                    source->running = 0;
                    break;
                }
                source->stream_data = to_go->next;
                source->queue_size -= to_go->len;
                refbuf_release (to_go);
            }
        }

807
808
809
        /* release write lock on client_tree */
        avl_tree_unlock(source->client_tree);
    }
Karl Heyes's avatar
Karl Heyes committed
810
811
    source_shutdown (source);
}
Jack Moffitt's avatar
Jack Moffitt committed
812

Michael Smith's avatar
Michael Smith committed
813

Karl Heyes's avatar
Karl Heyes committed
814
815
static void source_shutdown (source_t *source)
{
816
817
    mount_proxy *mountinfo;

818
    source->running = 0;
819
    INFO1("Source \"%s\" exiting", source->mount);
820

821
    mountinfo = config_find_mount (config_get_config(), source->mount);
822
823
824
825
    if (mountinfo)
    {
        if (mountinfo->on_disconnect)
            source_run_script (mountinfo->on_disconnect, source->mount);
826
        auth_stream_end (mountinfo, source->mount);
827
    }
828
829
    config_release_config();

830
831
    /* we have de-activated the source now, so no more clients will be
     * added, now move the listeners we have to the fallback (if any)
832
     */
833
834
835
    if (source->fallback_mount)
    {
        source_t *fallback_source;
Michael Smith's avatar
Michael Smith committed
836

837
838
        avl_tree_rlock(global.source_tree);
        fallback_source = source_find_mount (source->fallback_mount);
Michael Smith's avatar
Michael Smith committed
839

840
841
842
843
844
        if (fallback_source != NULL)
            source_move_clients (source, fallback_source);

        avl_tree_unlock (global.source_tree);
    }
Jack Moffitt's avatar
Jack Moffitt committed
845

846
    /* delete this sources stats */
847
    stats_event(source->mount, NULL, NULL);
Jack Moffitt's avatar
Jack Moffitt committed
848

849
850
851
852
    /* we don't remove the source from the tree here, it may be a relay and
       therefore reserved */
    source_clear_source (source);

853
854
    global_lock();
    global.sources--;
Karl Heyes's avatar
Karl Heyes committed
855
    stats_event_args (NULL, "sources", "%d", global.sources);
856
    global_unlock();
Jack Moffitt's avatar
Jack Moffitt committed
857

858
859
    /* release our hold on the lock so the main thread can continue cleaning up */
    thread_rwlock_unlock(source->shutdown_rwlock);
Jack Moffitt's avatar
Jack Moffitt committed
860
861
}

862

Jack Moffitt's avatar
Jack Moffitt committed
863
864
static int _compare_clients(void *compare_arg, void *a, void *b)
{
865
866
867
868
869
    client_t *clienta = (client_t *)a;
    client_t *clientb = (client_t *)b;

    connection_t *cona = clienta->con;
    connection_t *conb = clientb->con;
Jack Moffitt's avatar
Jack Moffitt committed
870

871
872
    if (cona->id < conb->id) return -1;
    if (cona->id > conb->id) return 1;
Jack Moffitt's avatar
Jack Moffitt committed
873

874
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
875
876
}

877
int source_remove_client(void *key)
Jack Moffitt's avatar
Jack Moffitt committed
878
{
879
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
880
881
882
883
}

static int _free_client(void *key)
{
884
885
    client_t *client = (client_t *)key;

886
887
888
889
890
    /* if no response has been sent then send a 404 */
    if (client->respcode == 0)
        client_send_404 (client, "Mount unavailable");
    else
        client_destroy(client);
891
892
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
893
}
894

895
static void _parse_audio_info (source_t *source, const char *s)
896
{
897
    const char *start = s;
Karl Heyes's avatar
Karl Heyes committed
898
    unsigned int len;
899
900
901
902
903
904
905
906
907
908
909
910

    while (start != NULL && *start != '\0')
    {
        if ((s = strchr (start, ';')) == NULL)
            len = strlen (start);
        else
        {
            len = (int)(s - start);
            s++; /* skip passed the ';' */
        }
        if (len)
        {
911
            char name[100], value[200];
912
913
            char *esc;

914
            sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
915
916
917
918
            esc = util_url_unescape (value);
            if (esc)
            {
                util_dict_set (source->audio_info, name, esc);
919
                stats_event (source->mount, name, esc);
920
                free (esc);
921
922
            }
        }
923
        start = s;
924
925
    }
}
926
927


928
/* Apply the mountinfo details to the source */
929
static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
930
{
931
932
933
934
    char *str;
    int val;
    http_parser_t *parser = NULL;

935
    DEBUG1("Applying mount information for \"%s\"", source->mount);
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
    if (mountinfo)
    {
        source->max_listeners = mountinfo->max_listeners;
        source->fallback_override = mountinfo->fallback_override;
        source->hidden = mountinfo->hidden;
    }

    /* if a setting is available in the mount details then use it, else
     * check the parser details. */

    if (source->client)
        parser = source->client->parser;

    /* public */
    if (mountinfo && mountinfo->yp_public >= 0)
        val = mountinfo->yp_public;
    else
    {
        do {
            str = httpp_getvar (parser, "ice-public");
            if (str) break;
            str = httpp_getvar (parser, "icy-pub");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-public");
            if (str) break;
            /* handle header from icecast v2 release */
            str = httpp_getvar (parser, "icy-public");
            if (str) break;
            str = "0";
        } while (0);
        val = atoi (str);
    }
    stats_event_args (source->mount, "public", "%d", val);
    if (source->yp_public != val)
    {
        DEBUG1 ("YP changed to %d", val);
        if (val)
            yp_add (source->mount);
        else
            yp_remove (source->mount);
        source->yp_public = val;
    }

    /* stream name */
    if (mountinfo && mountinfo->stream_name)
        str = mountinfo->stream_name;
    else
    {
        do {
            str = httpp_getvar (parser, "ice-name");
            if (str) break;
            str = httpp_getvar (parser, "icy-name");
            if (str) break;
            str = httpp_getvar (parser, "x-audiocast-name");
            if (str) break;
            str = "Unspecified name";
        } while (0);
    }
    stats_event (source->mount, "server_name", str);

    /* stream description */
    if (mountinfo && mountinfo->stream_description)
        str = mountinfo->stream_description;
    else
    {