slave.c 5.58 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

#include "os.h"

#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"

#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
41
#include "geturl.h"
42
#include "source.h"
Michael Smith's avatar
Michael Smith committed
43
#include "format.h"
44 45 46 47

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
48
thread_type *_slave_thread_id;
49 50 51 52 53
static int _initialized = 0;

void slave_initialize(void) {
	if (_initialized) return;
    /* Don't create a slave thread if it isn't configured */
Michael Smith's avatar
Michael Smith committed
54 55
    if (config_get_config()->master_server == NULL && 
            config_get_config()->relay == NULL)
56 57 58 59 60 61 62 63 64 65 66 67
        return;

	_initialized = 1;
	_slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}

void slave_shutdown(void) {
	if (!_initialized) return;
	_initialized = 0;
	thread_join(_slave_thread_id);
}

68
static void create_relay_stream(char *server, int port, 
69
        char *remotemount, char *localmount, int mp3)
Michael Smith's avatar
Michael Smith committed
70 71
{
    sock_t streamsock;
72 73 74
	char header[4096];
	connection_t *con;
	http_parser_t *parser;
75
    client_t *client;
Michael Smith's avatar
Michael Smith committed
76

77 78 79 80
    if(!localmount)
        localmount = remotemount;

    DEBUG1("Adding source at mountpoint \"%s\"", localmount);
Michael Smith's avatar
Michael Smith committed
81 82 83

	streamsock = sock_connect_wto(server, port, 0);
	if (streamsock == SOCK_ERROR) {
84
        WARN2("Failed to relay stream from master server, couldn't connect to http://%s:%d", server, port);
Michael Smith's avatar
Michael Smith committed
85 86 87
        return;
	}
	con = create_connection(streamsock, NULL);
88 89 90 91 92 93 94
    if(mp3) {
    	sock_write(streamsock, "GET %s HTTP/1.0\r\nIcy-MetaData: 1\r\n", 
                remotemount);
    }
    else {
    	sock_write(streamsock, "GET %s HTTP/1.0\r\n\r\n", remotemount);
    }
Michael Smith's avatar
Michael Smith committed
95 96 97 98 99 100 101
	memset(header, 0, sizeof(header));
	if (util_read_header(con->sock, header, 4096) == 0) {
		connection_close(con);
		return;
	}
	parser = httpp_create_parser();
	httpp_initialize(parser, NULL);
102
	if(!httpp_parse_response(parser, header, strlen(header), localmount)) {
Michael Smith's avatar
Michael Smith committed
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
        if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) {
            ERROR1("Error parsing relay request: %s", 
                    httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
        }
        else
            ERROR0("Error parsing relay request");
		connection_close(con);
        httpp_destroy(parser);
        return;
    }

    client = client_create(con, parser);
	if (!connection_create_source(client, con, parser, 
                httpp_getvar(parser, HTTPP_VAR_URI))) {
        client_destroy(client);
	}
    return;
}

static void *_slave_thread(void *arg) {
	sock_t mastersock;
	char buf[256];
125
    int interval = config_get_config()->master_update_interval;
126 127 128
    char *authheader, *data;
    int len;
    char *username = "relay";
129
    char *password = config_get_config()->master_password;
Michael Smith's avatar
Michael Smith committed
130
    relay_server *relay;
131 132 133 134

    if(password == NULL)
        password = config_get_config()->source_password;

135 136 137 138 139 140 141 142 143

	while (_initialized) {
        if (config_get_config()->master_update_interval > ++interval) {
		    thread_sleep(1000000);
            continue;
        }
        else
            interval = 0;

Michael Smith's avatar
Michael Smith committed
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
        if(config_get_config()->master_server != NULL) {
		    mastersock = sock_connect_wto(config_get_config()->master_server, 
                    config_get_config()->master_server_port, 0);
    		if (mastersock == SOCK_ERROR) {
                WARN0("Relay slave failed to contact master server to fetch stream list");
		    	continue;
    		}

            len = strlen(username) + strlen(password) + 1;
            authheader = malloc(len+1);
            strcpy(authheader, username);
            strcat(authheader, ":");
            strcat(authheader, password);
            data = util_base64_encode(authheader);
		    sock_write(mastersock, 
                    "GET /admin/streamlist HTTP/1.0\r\n"
                    "Authorization: Basic %s\r\n"
                    "\r\n", data);
            free(authheader);
            free(data);
    		while (sock_read_line(mastersock, buf, sizeof(buf))) {
                if(!strlen(buf))
                    break;
            }
168

Michael Smith's avatar
Michael Smith committed
169 170 171 172 173 174 175 176
	    	while (sock_read_line(mastersock, buf, sizeof(buf))) {
		    	avl_tree_rlock(global.source_tree);
			    if (!source_find_mount(buf)) {
				    avl_tree_unlock(global.source_tree);

                    create_relay_stream(
                            config_get_config()->master_server,
                            config_get_config()->master_server_port,
177
                            buf, NULL, 0);
Michael Smith's avatar
Michael Smith committed
178 179 180 181 182 183
    			} 
                else
    	    		avl_tree_unlock(global.source_tree);
    		}
	    	sock_close(mastersock);
        }
Michael Smith's avatar
Michael Smith committed
184 185

        /* And now, we process the individual mounts... */
Michael Smith's avatar
Michael Smith committed
186
        relay = config_get_config()->relay;
Michael Smith's avatar
Michael Smith committed
187 188
        while(relay) {
            avl_tree_rlock(global.source_tree);
189
            if(!source_find_mount(relay->localmount)) {
Michael Smith's avatar
Michael Smith committed
190 191
                avl_tree_unlock(global.source_tree);

192
                create_relay_stream(relay->server, relay->port, relay->mount,
193
                        relay->localmount, relay->mp3metadata);
Michael Smith's avatar
Michael Smith committed
194 195 196
            }
            else
                avl_tree_unlock(global.source_tree);
Michael Smith's avatar
Michael Smith committed
197
            relay = relay->next;
Michael Smith's avatar
Michael Smith committed
198
        }
199 200 201 202
	}
	thread_exit(0);
	return NULL;
}
Michael Smith's avatar
Michael Smith committed
203