slave.c 4.89 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

#include "os.h"

#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"

#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
41
#include "geturl.h"
42
#include "source.h"
Michael Smith's avatar
Michael Smith committed
43
#include "format.h"
44
45
46
47

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
48
thread_type *_slave_thread_id;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
static int _initialized = 0;

void slave_initialize(void) {
	if (_initialized) return;
    /* Don't create a slave thread if it isn't configured */
    if (config_get_config()->master_server == NULL)
        return;

	_initialized = 1;
	_slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}

void slave_shutdown(void) {
	if (!_initialized) return;
	_initialized = 0;
	thread_join(_slave_thread_id);
}

Michael Smith's avatar
Michael Smith committed
67
68
69
static void create_relay_stream(char *server, int port, char *mount)
{
    sock_t streamsock;
70
71
72
	char header[4096];
	connection_t *con;
	http_parser_t *parser;
73
    client_t *client;
Michael Smith's avatar
Michael Smith committed
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

    DEBUG1("Adding source at mountpoint \"%s\"", mount);

	streamsock = sock_connect_wto(server, port, 0);
	if (streamsock == SOCK_ERROR) {
        WARN0("Failed to relay stream from master server");
        return;
	}
	con = create_connection(streamsock, NULL);
	sock_write(streamsock, "GET %s HTTP/1.0\r\n\r\n", mount);
	memset(header, 0, sizeof(header));
	if (util_read_header(con->sock, header, 4096) == 0) {
		connection_close(con);
		return;
	}
	parser = httpp_create_parser();
	httpp_initialize(parser, NULL);
	if(!httpp_parse_response(parser, header, strlen(header), mount)) {
        if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) {
            ERROR1("Error parsing relay request: %s", 
                    httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
        }
        else
            ERROR0("Error parsing relay request");
		connection_close(con);
        httpp_destroy(parser);
        return;
    }

    client = client_create(con, parser);
	if (!connection_create_source(client, con, parser, 
                httpp_getvar(parser, HTTPP_VAR_URI))) {
        client_destroy(client);
	}
    return;
}

static void *_slave_thread(void *arg) {
	sock_t mastersock;
	char buf[256];
114
    int interval = config_get_config()->master_update_interval;
115
116
117
    char *authheader, *data;
    int len;
    char *username = "relay";
118
119
120
121
122
    char *password = config_get_config()->master_password;

    if(password == NULL)
        password = config_get_config()->source_password;

123
124
125
126
127
128
129
130
131
132
133

	while (_initialized) {
        if (config_get_config()->master_update_interval > ++interval) {
		    thread_sleep(1000000);
            continue;
        }
        else
            interval = 0;

		mastersock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0);
		if (mastersock == SOCK_ERROR) {
134
            WARN0("Relay slave failed to contact master server to fetch stream list");
135
136
			continue;
		}
137

138
        len = strlen(username) + strlen(password) + 1;
139
140
141
        authheader = malloc(len+1);
        strcpy(authheader, username);
        strcat(authheader, ":");
142
        strcat(authheader, password);
143
        data = util_base64_encode(authheader);
144
145
146
147
		sock_write(mastersock, 
                "GET /admin/streamlist HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
148
        free(authheader);
149
        free(data);
150
		while (sock_read_line(mastersock, buf, sizeof(buf))) {
151
152
153
154
155
            if(!strlen(buf))
                break;
        }

		while (sock_read_line(mastersock, buf, sizeof(buf))) {
156
157
158
			avl_tree_rlock(global.source_tree);
			if (!source_find_mount(buf)) {
				avl_tree_unlock(global.source_tree);
159

Michael Smith's avatar
Michael Smith committed
160
161
162
163
164
165
166
                create_relay_stream(
                        config_get_config()->master_server,
                        config_get_config()->master_server_port,
                        buf);
			} 
            else
    			avl_tree_unlock(global.source_tree);
167
168
		}
		sock_close(mastersock);
Michael Smith's avatar
Michael Smith committed
169
170
171
172
173
174
175
176
177
178
179
180
181

        /* And now, we process the individual mounts... */
        relay_server *relay = config_get_config()->relay;
        while(relay) {
            avl_tree_rlock(global.source_tree);
            if(!source_find_mount(relay->mount)) {
                avl_tree_unlock(global.source_tree);

                create_relay_stream(relay->server, relay->port, relay->mount);
            }
            else
                avl_tree_unlock(global.source_tree);
        }
182
183
184
185
	}
	thread_exit(0);
	return NULL;
}
Michael Smith's avatar
Michael Smith committed
186