slave.c 5.41 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

#include "os.h"

#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"

#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
41
#include "geturl.h"
42
#include "source.h"
Michael Smith's avatar
Michael Smith committed
43
#include "format.h"
44
45
46
47

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
48
thread_type *_slave_thread_id;
49
50
51
52
53
static int _initialized = 0;

void slave_initialize(void) {
	if (_initialized) return;
    /* Don't create a slave thread if it isn't configured */
Michael Smith's avatar
Michael Smith committed
54
55
    if (config_get_config()->master_server == NULL && 
            config_get_config()->relay == NULL)
56
57
58
59
60
61
62
63
64
65
66
67
        return;

	_initialized = 1;
	_slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}

void slave_shutdown(void) {
	if (!_initialized) return;
	_initialized = 0;
	thread_join(_slave_thread_id);
}

68
69
static void create_relay_stream(char *server, int port, 
        char *remotemount, char *localmount)
Michael Smith's avatar
Michael Smith committed
70
71
{
    sock_t streamsock;
72
73
74
	char header[4096];
	connection_t *con;
	http_parser_t *parser;
75
    client_t *client;
Michael Smith's avatar
Michael Smith committed
76

77
78
79
80
    if(!localmount)
        localmount = remotemount;

    DEBUG1("Adding source at mountpoint \"%s\"", localmount);
Michael Smith's avatar
Michael Smith committed
81
82
83

	streamsock = sock_connect_wto(server, port, 0);
	if (streamsock == SOCK_ERROR) {
84
        WARN2("Failed to relay stream from master server, couldn't connect to http://%s:%d", server, port);
Michael Smith's avatar
Michael Smith committed
85
86
87
        return;
	}
	con = create_connection(streamsock, NULL);
88
	sock_write(streamsock, "GET %s HTTP/1.0\r\n\r\n", remotemount);
Michael Smith's avatar
Michael Smith committed
89
90
91
92
93
94
95
	memset(header, 0, sizeof(header));
	if (util_read_header(con->sock, header, 4096) == 0) {
		connection_close(con);
		return;
	}
	parser = httpp_create_parser();
	httpp_initialize(parser, NULL);
96
	if(!httpp_parse_response(parser, header, strlen(header), localmount)) {
Michael Smith's avatar
Michael Smith committed
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
        if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) {
            ERROR1("Error parsing relay request: %s", 
                    httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
        }
        else
            ERROR0("Error parsing relay request");
		connection_close(con);
        httpp_destroy(parser);
        return;
    }

    client = client_create(con, parser);
	if (!connection_create_source(client, con, parser, 
                httpp_getvar(parser, HTTPP_VAR_URI))) {
        client_destroy(client);
	}
    return;
}

static void *_slave_thread(void *arg) {
	sock_t mastersock;
	char buf[256];
119
    int interval = config_get_config()->master_update_interval;
120
121
122
    char *authheader, *data;
    int len;
    char *username = "relay";
123
    char *password = config_get_config()->master_password;
Michael Smith's avatar
Michael Smith committed
124
    relay_server *relay;
125
126
127
128

    if(password == NULL)
        password = config_get_config()->source_password;

129
130
131
132
133
134
135
136
137

	while (_initialized) {
        if (config_get_config()->master_update_interval > ++interval) {
		    thread_sleep(1000000);
            continue;
        }
        else
            interval = 0;

Michael Smith's avatar
Michael Smith committed
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
        if(config_get_config()->master_server != NULL) {
		    mastersock = sock_connect_wto(config_get_config()->master_server, 
                    config_get_config()->master_server_port, 0);
    		if (mastersock == SOCK_ERROR) {
                WARN0("Relay slave failed to contact master server to fetch stream list");
		    	continue;
    		}

            len = strlen(username) + strlen(password) + 1;
            authheader = malloc(len+1);
            strcpy(authheader, username);
            strcat(authheader, ":");
            strcat(authheader, password);
            data = util_base64_encode(authheader);
		    sock_write(mastersock, 
                    "GET /admin/streamlist HTTP/1.0\r\n"
                    "Authorization: Basic %s\r\n"
                    "\r\n", data);
            free(authheader);
            free(data);
    		while (sock_read_line(mastersock, buf, sizeof(buf))) {
                if(!strlen(buf))
                    break;
            }
162

Michael Smith's avatar
Michael Smith committed
163
164
165
166
167
168
169
170
	    	while (sock_read_line(mastersock, buf, sizeof(buf))) {
		    	avl_tree_rlock(global.source_tree);
			    if (!source_find_mount(buf)) {
				    avl_tree_unlock(global.source_tree);

                    create_relay_stream(
                            config_get_config()->master_server,
                            config_get_config()->master_server_port,
171
                            buf, NULL);
Michael Smith's avatar
Michael Smith committed
172
173
174
175
176
177
    			} 
                else
    	    		avl_tree_unlock(global.source_tree);
    		}
	    	sock_close(mastersock);
        }
Michael Smith's avatar
Michael Smith committed
178
179

        /* And now, we process the individual mounts... */
Michael Smith's avatar
Michael Smith committed
180
        relay = config_get_config()->relay;
Michael Smith's avatar
Michael Smith committed
181
182
        while(relay) {
            avl_tree_rlock(global.source_tree);
183
            if(!source_find_mount(relay->localmount)) {
Michael Smith's avatar
Michael Smith committed
184
185
                avl_tree_unlock(global.source_tree);

186
187
                create_relay_stream(relay->server, relay->port, relay->mount,
                        relay->localmount);
Michael Smith's avatar
Michael Smith committed
188
189
190
            }
            else
                avl_tree_unlock(global.source_tree);
Michael Smith's avatar
Michael Smith committed
191
            relay = relay->next;
Michael Smith's avatar
Michael Smith committed
192
        }
193
194
195
196
	}
	thread_exit(0);
	return NULL;
}
Michael Smith's avatar
Michael Smith committed
197