slave.c 4.89 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

#include "os.h"

#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"

#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
41
#include "geturl.h"
42
#include "source.h"
Michael Smith's avatar
Michael Smith committed
43
#include "format.h"
44 45 46 47

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
48
thread_type *_slave_thread_id;
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
static int _initialized = 0;

void slave_initialize(void) {
	if (_initialized) return;
    /* Don't create a slave thread if it isn't configured */
    if (config_get_config()->master_server == NULL)
        return;

	_initialized = 1;
	_slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}

void slave_shutdown(void) {
	if (!_initialized) return;
	_initialized = 0;
	thread_join(_slave_thread_id);
}

Michael Smith's avatar
Michael Smith committed
67 68 69
static void create_relay_stream(char *server, int port, char *mount)
{
    sock_t streamsock;
70 71 72
	char header[4096];
	connection_t *con;
	http_parser_t *parser;
73
    client_t *client;
Michael Smith's avatar
Michael Smith committed
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

    DEBUG1("Adding source at mountpoint \"%s\"", mount);

	streamsock = sock_connect_wto(server, port, 0);
	if (streamsock == SOCK_ERROR) {
        WARN0("Failed to relay stream from master server");
        return;
	}
	con = create_connection(streamsock, NULL);
	sock_write(streamsock, "GET %s HTTP/1.0\r\n\r\n", mount);
	memset(header, 0, sizeof(header));
	if (util_read_header(con->sock, header, 4096) == 0) {
		connection_close(con);
		return;
	}
	parser = httpp_create_parser();
	httpp_initialize(parser, NULL);
	if(!httpp_parse_response(parser, header, strlen(header), mount)) {
        if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) {
            ERROR1("Error parsing relay request: %s", 
                    httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
        }
        else
            ERROR0("Error parsing relay request");
		connection_close(con);
        httpp_destroy(parser);
        return;
    }

    client = client_create(con, parser);
	if (!connection_create_source(client, con, parser, 
                httpp_getvar(parser, HTTPP_VAR_URI))) {
        client_destroy(client);
	}
    return;
}

static void *_slave_thread(void *arg) {
	sock_t mastersock;
	char buf[256];
114
    int interval = config_get_config()->master_update_interval;
115 116 117
    char *authheader, *data;
    int len;
    char *username = "relay";
118 119 120 121 122
    char *password = config_get_config()->master_password;

    if(password == NULL)
        password = config_get_config()->source_password;

123 124 125 126 127 128 129 130 131 132 133

	while (_initialized) {
        if (config_get_config()->master_update_interval > ++interval) {
		    thread_sleep(1000000);
            continue;
        }
        else
            interval = 0;

		mastersock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0);
		if (mastersock == SOCK_ERROR) {
134
            WARN0("Relay slave failed to contact master server to fetch stream list");
135 136
			continue;
		}
137

138
        len = strlen(username) + strlen(password) + 1;
139 140 141
        authheader = malloc(len+1);
        strcpy(authheader, username);
        strcat(authheader, ":");
142
        strcat(authheader, password);
143
        data = util_base64_encode(authheader);
144 145 146 147
		sock_write(mastersock, 
                "GET /admin/streamlist HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
148
        free(authheader);
149
        free(data);
150
		while (sock_read_line(mastersock, buf, sizeof(buf))) {
151 152 153 154 155
            if(!strlen(buf))
                break;
        }

		while (sock_read_line(mastersock, buf, sizeof(buf))) {
156 157 158
			avl_tree_rlock(global.source_tree);
			if (!source_find_mount(buf)) {
				avl_tree_unlock(global.source_tree);
159

Michael Smith's avatar
Michael Smith committed
160 161 162 163 164 165 166
                create_relay_stream(
                        config_get_config()->master_server,
                        config_get_config()->master_server_port,
                        buf);
			} 
            else
    			avl_tree_unlock(global.source_tree);
167 168
		}
		sock_close(mastersock);
Michael Smith's avatar
Michael Smith committed
169 170 171 172 173 174 175 176 177 178 179 180 181

        /* And now, we process the individual mounts... */
        relay_server *relay = config_get_config()->relay;
        while(relay) {
            avl_tree_rlock(global.source_tree);
            if(!source_find_mount(relay->mount)) {
                avl_tree_unlock(global.source_tree);

                create_relay_stream(relay->server, relay->port, relay->mount);
            }
            else
                avl_tree_unlock(global.source_tree);
        }
182 183 184 185
	}
	thread_exit(0);
	return NULL;
}
Michael Smith's avatar
Michael Smith committed
186