fserve.c 9.33 KB
Newer Older
1 2 3 4
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
5
#include <sys/poll.h>
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44

#ifndef _WIN32
#include <unistd.h>
#include <sys/time.h>
#include <sys/socket.h>
#else
#include <winsock2.h>
#include <windows.h>
#endif

#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"

#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "format.h"
#include "log.h"
#include "logging.h"
#include "config.h"
#include "util.h"

#include "fserve.h"

#undef CATMODULE
#define CATMODULE "fserve"

#define BUFSIZE 4096

static avl_tree *client_tree;
static avl_tree *pending_tree;

static cond_t fserv_cond;
static thread_t *fserv_thread;
static int run_fserv;
45 46 47 48 49
static int fserve_clients;
static int client_tree_changed=0;

static struct pollfd *ufds = NULL;
static int ufdssize = 0;
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85

/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _remove_client(void *key);
static int _free_client(void *key);
void *fserv_thread_function(void *arg);

void fserve_initialize(void)
{
    if(!config_get_config()->fileserve)
        return;

	client_tree = avl_tree_new(_compare_clients, NULL);
	pending_tree = avl_tree_new(_compare_clients, NULL);
    thread_cond_create(&fserv_cond);

    run_fserv = 1;

    fserv_thread = thread_create("File Serving Thread", 
            fserv_thread_function, NULL, THREAD_ATTACHED);
}

void fserve_shutdown(void)
{
    if(!config_get_config()->fileserve)
        return;

    run_fserv = 0;
    thread_cond_signal(&fserv_cond);
    thread_join(fserv_thread);

    thread_cond_destroy(&fserv_cond);
    avl_tree_free(client_tree, _free_client);
    avl_tree_free(pending_tree, _free_client);
}

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
static void wait_for_fds() {
    avl_node *client_node;
    fserve_t *client;
    int i;

    while(run_fserv) {
        if(client_tree_changed) {
            client_tree_changed = 0;
            i = 0;
            ufdssize = fserve_clients;
            ufds = realloc(ufds, ufdssize * sizeof(struct pollfd));
            avl_tree_rlock(client_tree);
            client_node = avl_get_first(client_tree);
            while(client_node) {
                client = client_node->key;
                ufds[i].fd = client->client->con->sock;
                ufds[i].events = POLLOUT;
                client_node = avl_get_next(client_node);
            }
            avl_tree_unlock(client_tree);
        }

        if(poll(ufds, ufdssize, 200) > 0) {
            return;
        }
        else {
            avl_tree_rlock(pending_tree);
            client_node = avl_get_first(pending_tree);
            avl_tree_unlock(pending_tree);
            if(client_node)
                return;
        }
    }
}


122 123 124 125 126 127 128 129 130 131 132
void *fserv_thread_function(void *arg)
{
    avl_node *client_node, *pending_node;
    fserve_t *client;
    int sbytes, bytes;

    while (run_fserv) {
        avl_tree_rlock(client_tree);

        client_node = avl_get_first(client_tree);
        if(!client_node) {
133
            avl_tree_rlock(pending_tree);
134 135 136
            pending_node = avl_get_first(pending_tree);
            if(!pending_node) {
                /* There are no current clients. Wait until there are... */
137
                avl_tree_unlock(pending_tree);
138 139 140 141
                avl_tree_unlock(client_tree);
                thread_cond_wait(&fserv_cond);
                continue;
            }
142
            avl_tree_unlock(pending_tree);
143 144
        }

145 146 147 148 149 150 151
        /* This isn't hugely efficient, but it'll do for now */
        avl_tree_unlock(client_tree);
        wait_for_fds();

        avl_tree_rlock(client_tree);
        client_node = avl_get_first(client_tree);

152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
        while(client_node) {
            avl_node_wlock(client_node);

            client = (fserve_t *)client_node->key;

            if(client->offset >= client->datasize) {
                /* Grab a new chunk */
                bytes = fread(client->buf, 1, BUFSIZE, client->file);
                if(bytes <= 0) {
                    client->client->con->error = 1;
                    avl_node_unlock(client_node);
                    client_node = avl_get_next(client_node);
                    continue;
                }
                client->offset = 0;
                client->datasize = bytes;
            }

            /* Now try and send current chunk. */
            sbytes = sock_write_bytes(client->client->con->sock, 
                    &client->buf[client->offset], 
                    client->datasize - client->offset);

            // TODO: remove clients if they take too long.
            if(sbytes >= 0) {
                client->offset += sbytes;
                client->client->con->sent_bytes += sbytes;
            }
            else if(!sock_recoverable(sock_error())) {
                DEBUG0("Fileserving client had fatal error, disconnecting");
                client->client->con->error = 1;
            }
            else
                DEBUG0("Fileserving client had recoverable error");

            avl_node_unlock(client_node);
            client_node = avl_get_next(client_node);
        }

        avl_tree_unlock(client_tree);

        /* Now we need a write lock instead, to delete done clients. */
        avl_tree_wlock(client_tree);

        client_node = avl_get_first(client_tree);
        while(client_node) {
            client = (fserve_t *)client_node->key;
            if(client->client->con->error) {
200
                fserve_clients--;
201 202
                client_node = avl_get_next(client_node);
                avl_delete(client_tree, (void *)client, _free_client);
203
                client_tree_changed = 1;
204 205 206 207 208 209 210 211 212 213 214 215
                continue;
            }
            client_node = avl_get_next(client_node);
        }

        avl_tree_wlock(pending_tree);

        /* And now insert new clients. */
        client_node = avl_get_first(pending_tree);
        while(client_node) {
            client = (fserve_t *)client_node->key;
            avl_insert(client_tree, client);
216 217 218
            client_tree_changed = 1;
            fserve_clients++;
            stats_event_inc(NULL, "clients");
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
            client_node = avl_get_next(client_node);

        }

        /* clear pending */
        while(avl_get_first(pending_tree)) {
            avl_delete(pending_tree, avl_get_first(pending_tree)->key, 
                    _remove_client);
        }

        avl_tree_unlock(pending_tree);
        avl_tree_unlock(client_tree);
    }

    /* Shutdown path */

    avl_tree_wlock(pending_tree);
    while(avl_get_first(pending_tree))
        avl_delete(pending_tree, avl_get_first(pending_tree)->key, 
                _free_client);
    avl_tree_unlock(pending_tree);

    avl_tree_wlock(client_tree);
    while(avl_get_first(client_tree))
        avl_delete(client_tree, avl_get_first(client_tree)->key, 
                _free_client);
    avl_tree_unlock(client_tree);

    thread_exit(0);
    return NULL;
}

251
static char *fserve_content_type(char *path)
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
{
    char *ext = util_get_extension(path);

    if(!strcmp(ext, "ogg"))
        return "application/x-ogg";
    else if(!strcmp(ext, "mp3"))
        return "audio/mpeg";
    else if(!strcmp(ext, "html"))
        return "text/html";
    else if(!strcmp(ext, "txt"))
        return "text/plain";
    else
        return "application/octet-stream";
    /* TODO Add more types */
}

static void fserve_client_destroy(fserve_t *client)
{
    if(client) {
        if(client->buf)
            free(client->buf);
        if(client->file)
            fclose(client->file);

        if(client->client)
            client_destroy(client->client);
        free(client);
    }
}

int fserve_client_create(client_t *httpclient, char *path)
{
    fserve_t *client = calloc(1, sizeof(fserve_t));
285
    int bytes;
286 287 288

    client->file = fopen(path, "rb");
    if(!client->file) {
289
        client_send_404(httpclient, "File not readable");
290 291
        return -1;
    }
292 293

    client->client = httpclient;
294 295 296 297
    client->offset = 0;
    client->datasize = 0;
    client->buf = malloc(BUFSIZE);

298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    global_lock();
    if(global.clients >= config_get_config()->client_limit) {
        httpclient->respcode = 504;
        bytes = sock_write(httpclient->con->sock,
                "HTTP/1.0 504 Server Full\r\n"
                "Content-Type: text/html\r\n\r\n"
                "<b>Server is full, try again later.</b>\r\n");
        if(bytes > 0) httpclient->con->sent_bytes = bytes;
        fserve_client_destroy(client);
        global_unlock();
        return -1;
    }
    global.clients++;
    global_unlock();

    httpclient->respcode = 200;
    bytes = sock_write(httpclient->con->sock,
            "HTTP/1.0 200 OK\r\n"
            "Content-Type: %s\r\n\r\n",
            fserve_content_type(path));
    if(bytes > 0) httpclient->con->sent_bytes = bytes;

    sock_set_blocking(client->client->con->sock, SOCK_NONBLOCK);

322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
    avl_tree_wlock(pending_tree);
    avl_insert(pending_tree, client);
    avl_tree_unlock(pending_tree);

    thread_cond_signal(&fserv_cond);

    return 0;
}

static int _compare_clients(void *compare_arg, void *a, void *b)
{
	connection_t *cona = (connection_t *)a;
    connection_t *conb = (connection_t *)b;

	if (cona->id < conb->id) return -1;
	if (cona->id > conb->id) return 1;

	return 0;
}

static int _remove_client(void *key)
{
	return 1;
}

static int _free_client(void *key)
{
	fserve_t *client = (fserve_t *)key;

	fserve_client_destroy(client);
352 353 354 355 356
    global_lock();
    global.clients--;
    global_unlock();
    stats_event_dec(NULL, "clients");

357 358 359 360
	
	return 1;
}