From 6a931a1185aa8170636b8f55c38ad91af6f897f6 Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Thu, 18 Dec 2014 12:31:18 +0100 Subject: KIRO Server can now issue a client reallocation command --- src/kiro-rdma.h | 3 +- src/kiro-server.c | 157 +++++++++++++++++++++++++++++++++++++++++++++++++++--- src/kiro-server.h | 13 +++++ 3 files changed, 164 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h index 5b4895f..6baa017 100644 --- a/src/kiro-rdma.h +++ b/src/kiro-rdma.h @@ -56,7 +56,8 @@ struct kiro_ctrl_msg { KIRO_ACK_RDMA, // acknowledge RDMA Request and provide Memory Region Information KIRO_REJ_RDMA, // RDMA Request rejected :( (peer_mri will be invalid) KIRO_PING, // PING Message - KIRO_PONG // PONG Message (PING reply) + KIRO_PONG, // PONG Message (PING reply) + KIRO_REALLOC // Used by the server to notify the client about a new peer_mri } msg_type; struct ibv_mr peer_mri; diff --git a/src/kiro-server.c b/src/kiro-server.c index fadc329..0d62ae7 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -67,12 +67,22 @@ struct _KiroServerPrivate { G_DEFINE_TYPE (KiroServer, kiro_server, G_TYPE_OBJECT); +// List of clients that were asked to realloc their memory +GList *realloc_clients; + +// Temporary lock for connecting clients +G_LOCK_DEFINE (connection_handling); + +// Used to prevent raceconditions during realloc timeout +G_LOCK_DEFINE (realloc_timeout); + struct kiro_client_connection { guint id; // Client identification (Easy access) GIOChannel *rcv_ec; // GLib IO Channel encapsulation for receive completions for the client guint source_id; // ID of the source created by g_io_add_watch, needed to remove it again struct rdma_cm_id *conn; // Connection Manager ID of the client + struct kiro_rdma_mem *backup_mri; // Backup MRI for reallocation }; @@ -177,7 +187,7 @@ error: static int -welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size) +grant_client_access (struct rdma_cm_id *client, void *mem, size_t mem_size, guint type) { struct kiro_connection_context *ctx = (struct kiro_connection_context *) (client->context); ctx->rdma_mr = (struct kiro_rdma_mem *)g_try_malloc0 (sizeof (struct kiro_rdma_mem)); @@ -199,7 +209,7 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size) struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem); - msg->msg_type = KIRO_ACK_RDMA; + msg->msg_type = type; msg->peer_mri = * (ctx->rdma_mr->mr); @@ -289,17 +299,22 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) (void) source; (void) condition; + if (!G_TRYLOCK (connection_handling)) { + // Unsafe to handle connection management right now. + // Wait for next dispatch. + return TRUE; + } + KiroServerPrivate *priv = (KiroServerPrivate *)data; struct rdma_cm_event *active_event; if (0 <= rdma_get_cm_event (priv->ec, &active_event)) { - //Disable cancellation to prevent undefined states during shutdown struct rdma_cm_event *ev = g_try_malloc (sizeof (*active_event)); if (!ev) { g_critical ("Unable to allocate memory for Event handling!"); rdma_ack_cm_event (active_event); - return FALSE; + goto exit; } memcpy (ev, active_event, sizeof (*active_event)); @@ -311,7 +326,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) //Don't connect this client any more. //Sorry mate! rdma_reject (ev->id, NULL, 0); - return TRUE; + goto exit; } do { @@ -327,7 +342,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) goto fail; // Post a welcoming "Receive" for handshaking - if (welcome_client (ev->id, priv->mem, priv->mem_size)) + if (grant_client_access (ev->id, priv->mem, priv->mem_size, KIRO_ACK_RDMA)) goto fail; ibv_req_notify_cq (ev->id->recv_cq, 0); // Make the respective Queue push events onto the channel @@ -354,6 +369,8 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) fail: g_warning ("Failed to accept client connection: %s", strerror (errno)); + if (errno == EINVAL) + g_message ("This might happen if the client pulls back the connection request before the server can handle it."); } while(0); } @@ -361,7 +378,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); if (!ctx->container) { g_debug ("Got disconnect request from unknown client"); - return FALSE; + goto exit; } GList *client = g_list_find (priv->clients, (gconstpointer) ctx->container); @@ -390,8 +407,11 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) g_debug ("Connection closed successfully. %u connected clients remaining", g_list_length (priv->clients)); } +exit: g_free (ev); } + + G_UNLOCK (connection_handling); return TRUE; } @@ -503,7 +523,7 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void } -static void +void disconnect_client (gpointer data, gpointer user_data) { (void)user_data; @@ -529,6 +549,127 @@ disconnect_client (gpointer data, gpointer user_data) } +/* + * NOTE: + * When sending the reconnection request to the clients, we try to copy all the + * clients from the current pirv->clients list to the new realloc_clients list. + * We will then remove eache ACKed client from the _OLD_ list, and will use that + * list as well to determine clients that have not responded in time. + * Afterwards, we will swap the old and the new list pointers to restore the + * correct list naming. + * + * This is a trick to circumvent problems with clients that, for whatever + * reason, can not be copied to the 'new' list. If that happens, we would not + * even recognize that a client was not informed, because it never appears in + * the new list to begin with, and the client would therefore survive the + * timeout for reallocation. That clients peer_mri would then never be unpinned + * (unless the server stops or the client disconnects), causing MASSIVE memory + * leakage. Also, the client would continue to read stale data in the best case, + * or newly allocated garbage in the worst case. + * + * Since all currently connected clients are guaranteed to be stored in the old + * list, using that one to detect failed ACKs makes much more sense, since + * clients that failed to be informed of reallocation would simply never send an + * ACK, stay on the list, and then securely be disconnected after the timeout. + **/ + +void +request_client_realloc (gpointer data, gpointer user_data) { + + if (!data || !user_data) { + g_critical ("Either client or server pointer was lost during client reconnect!"); + // The client will remain in the old list, never receive the + // reallocation request, therefore never send an ACK and therefore will + // be forcefully disconnected once the timeout happens. See Note above. + return; + } + + realloc_clients = g_list_append (realloc_clients, data); + + struct kiro_client_connection *cc = (struct kiro_client_connection *)data; + struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context; + + // user_data is used to pass the information about the new RDMA memory to + // this function. It is encapsulated in a kiro_rdma_mem struct. + struct kiro_rdma_mem *new_rdma_mem = (struct kiro_rdma_mem *)user_data; + + cc->backup_mri = ctx->rdma_mr; + ctx->rdma_mr = NULL; + g_debug ("Requesting REALLOC for client %i", cc->id); + if (grant_client_access (cc->conn, new_rdma_mem->mem, new_rdma_mem->size, KIRO_REALLOC)) { + ctx->rdma_mr = cc->backup_mri; + cc->backup_mri = NULL; + g_warning ("Failed to request REALLOC for client %i", cc->id); + return; + } + g_debug ("Client %i REALLOC request sent.", cc->id); +} + + +volatile gboolean timeout_done = FALSE; + +gboolean +client_realloc_timeout (gpointer data) { + + (void) data; + g_debug ("TIMEOUT OCCURED"); + timeout_done = TRUE; + return TRUE; +} + + +void +kiro_server_realloc (KiroServer *self, void *mem, size_t size) { + + if (!self) + return; + + struct kiro_rdma_mem rdma_mem; + rdma_mem.mem = mem; + rdma_mem.size = size; + + KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE (self); + + G_LOCK (connection_handling); + g_list_foreach (priv->clients, request_client_realloc, &rdma_mem); + + guint timeout = g_timeout_add_seconds (2, client_realloc_timeout, NULL); + + timeout_done = FALSE; + while (!timeout_done) {}; + + // Remove the timeout + GSource *timeout_source = g_main_context_find_source_by_id (NULL, timeout); + if (timeout_source) { + g_source_destroy (timeout_source); + } + + G_LOCK (realloc_timeout); + GList *current = g_list_first (priv->clients); + while (current) { + GList *client = g_list_find (realloc_clients, current->data); + if (client) { + struct kiro_client_connection *cc = (struct kiro_client_connection *)client->data; + g_debug ("Client %i did not ACK the REALLOC request in time.", cc->id); + disconnect_client (client->data, NULL); + realloc_clients = g_list_delete_link (realloc_clients, client); + } + current = g_list_next (current); + } + g_list_free (priv->clients); + priv->clients = realloc_clients; + realloc_clients = NULL; + G_UNLOCK (realloc_timeout); + + // CHANGE INTERNAL POINTERS FOR MEM!! + // priv->mem = mem, etc.. + + G_UNLOCK (connection_handling); + + +} + + void kiro_server_stop (KiroServer *self) { diff --git a/src/kiro-server.h b/src/kiro-server.h index 7e42159..655140a 100644 --- a/src/kiro-server.h +++ b/src/kiro-server.h @@ -129,6 +129,19 @@ void kiro_server_free (KiroServer *server); */ int kiro_server_start (KiroServer *server, const char *bind_addr, const char *bind_port, void *mem, size_t mem_size); + +/** + * kiro_server_realloc - Change the memory that is provided by the server + * @server: #KiroServer to perform the operation on + * @mem: (transfer none): Pointer to the memory that is to be provided + * @mem_size: Size in bytes of the given memory + * Description: + * Changes the memory that is provided by the server. All connected clients + * will automatically be informed about this change. + */ +void kiro_server_realloc (KiroServer *self, void* mem, size_t mem_size); + + /** * kiro_server_stop - Stops the server * @server: #KiroServer to perform the operation on -- cgit v1.2.3 From 5ad42a8bd4ec754b9c33f9c0b22dceb0e812c4a5 Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Thu, 15 Jan 2015 14:48:08 +0100 Subject: Fixed race conditions in kiro_server_realloc KIRO client now reacts to server realloc --- src/kiro-client.c | 75 ++++++++++++++---- src/kiro-rdma.h | 4 +- src/kiro-server.c | 226 +++++++++++++++++++++++++++++++++++++----------------- 3 files changed, 219 insertions(+), 86 deletions(-) (limited to 'src') diff --git a/src/kiro-client.c b/src/kiro-client.c index 6d0d3f5..3cf6e62 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -64,11 +64,32 @@ struct _KiroClientPrivate { G_DEFINE_TYPE (KiroClient, kiro_client, G_TYPE_OBJECT); - // Temporary storage and lock for PING timing G_LOCK_DEFINE (ping_time); volatile struct timeval ping_time; +G_LOCK_DEFINE (sync_lock); + +static inline gboolean +send_msg (struct rdma_cm_id *id, struct kiro_rdma_mem *r) +{ + gboolean retval = TRUE; + G_LOCK (sync_lock); + if (rdma_post_send (id, id, r->mem, r->size, r->mr, IBV_SEND_SIGNALED)) { + retval = FALSE; + } + else { + struct ibv_wc wc; + if (rdma_get_send_comp (id, &wc) < 0) { + retval = FALSE; + } + g_debug ("WC Status: %i", wc.status); + } + + G_UNLOCK (sync_lock); + return retval; +} + KiroClient * kiro_client_new (void) @@ -134,7 +155,6 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) struct rdma_cm_event *active_event; if (0 <= rdma_get_cm_event (priv->ec, &active_event)) { - //Disable cancellation to prevent undefined states during shutdown struct rdma_cm_event *ev = g_try_malloc (sizeof (*active_event)); if (!ev) { @@ -190,13 +210,13 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); if (!ctx->rdma_mr) { - //TODO: Connection teardown in an event handler routine? Not a good + //FIXME: Connection teardown in an event handler routine? Not a good //idea... g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); rdma_disconnect (priv->conn); kiro_destroy_connection_context (&ctx); rdma_destroy_ep (priv->conn); - return FALSE; + return TRUE; } } if (type == KIRO_PONG) { @@ -215,11 +235,41 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) G_UNLOCK (ping_time); } + if (type == KIRO_REALLOC) { + g_debug ("Got reallocation request from server."); + struct kiro_ctrl_msg *msg = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem); + + G_LOCK (sync_lock); + g_debug ("Rallocating memory..."); + kiro_destroy_rdma_memory (ctx->rdma_mr); + ctx->peer_mr = msg->peer_mri; + g_debug ("New size is: %zu", ctx->peer_mr.length); + ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); + G_UNLOCK (sync_lock); + + if (!ctx->rdma_mr) { + //FIXME: Connection teardown in an event handler routine? Not a good + //idea... + g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); + rdma_disconnect (priv->conn); + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (priv->conn); + } + + msg = ((struct kiro_ctrl_msg *)ctx->cf_mr_send->mem); + msg->msg_type = KIRO_ACK_RDMA; + if (!send_msg (priv->conn, ctx->cf_mr_send)) { + g_warning ("Failure while trying to post SEND for reallocation ACK: %s", strerror (errno)); + } + else { + g_debug ("Sent ACK to server"); + } + } //Post a generic receive in order to stay responsive to any messages from //the server if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { - //TODO: Connection teardown in an event handler routine? Not a good + //FIXME: Connection teardown in an event handler routine? Not a good //idea... g_critical ("Posting generic receive for connection failed: %s", strerror (errno)); kiro_destroy_connection_context (&ctx); @@ -363,6 +413,7 @@ kiro_client_sync (KiroClient *self) struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; + G_LOCK (sync_lock); if (rdma_post_read (priv->conn, priv->conn, ctx->rdma_mr->mem, ctx->peer_mr.length, ctx->rdma_mr->mr, 0, (uint64_t)ctx->peer_mr.addr, ctx->peer_mr.rkey)) { g_critical ("Failed to RDMA_READ from server: %s", strerror (errno)); goto fail; @@ -377,6 +428,7 @@ kiro_client_sync (KiroClient *self) switch (wc.status) { case IBV_WC_SUCCESS: + G_UNLOCK (sync_lock); return 0; case IBV_WC_RETRY_EXC_ERR: g_critical ("Server no longer responding"); @@ -390,6 +442,7 @@ kiro_client_sync (KiroClient *self) fail: kiro_destroy_connection (&(priv->conn)); + G_UNLOCK (sync_lock); return -1; } @@ -399,6 +452,7 @@ ping_timeout (gpointer data) { //Not needed. Void it to prevent 'unused variable' warning (void) data; + g_debug ("PING timed out"); G_LOCK (ping_time); @@ -444,20 +498,15 @@ kiro_client_ping_server (KiroClient *self) struct timeval local_time; gettimeofday (&local_time, NULL); - if (rdma_post_send (priv->conn, priv->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { + if (!send_msg (priv->conn, ctx->cf_mr_send)) { g_warning ("Failure while trying to post SEND for PING: %s", strerror (errno)); t_usec = -1; + G_UNLOCK (ping_time); goto end; } + g_debug ("PING message sent to server."); G_UNLOCK (ping_time); - struct ibv_wc wc; - if (rdma_get_send_comp (priv->conn, &wc) < 0) { - g_warning ("Failure during PING send: %s", strerror (errno)); - t_usec = -1; - goto end; - } - // Set a two-second timeout for the ping guint timeout = g_timeout_add_seconds (2, ping_timeout, NULL); diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h index 6baa017..c17e044 100644 --- a/src/kiro-rdma.h +++ b/src/kiro-rdma.h @@ -81,9 +81,9 @@ kiro_attach_qp (struct rdma_cm_id *id) id->pd = ibv_alloc_pd (id->verbs); id->send_cq_channel = ibv_create_comp_channel (id->verbs); - id->recv_cq_channel = id->send_cq_channel; //we use one shared completion channel + id->recv_cq_channel = ibv_create_comp_channel (id->verbs); id->send_cq = ibv_create_cq (id->verbs, 1, id, id->send_cq_channel, 0); - id->recv_cq = id->send_cq; //we use one shared completion queue + id->recv_cq = ibv_create_cq (id->verbs, 1, id, id->recv_cq_channel, 0); struct ibv_qp_init_attr qp_attr; memset (&qp_attr, 0, sizeof (struct ibv_qp_init_attr)); qp_attr.qp_context = (void *) (uintptr_t) id; diff --git a/src/kiro-server.c b/src/kiro-server.c index 0d62ae7..f8dd9d3 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -68,10 +68,11 @@ G_DEFINE_TYPE (KiroServer, kiro_server, G_TYPE_OBJECT); // List of clients that were asked to realloc their memory -GList *realloc_clients; +GList *realloc_list; // Temporary lock for connecting clients G_LOCK_DEFINE (connection_handling); +G_LOCK_DEFINE (rdma_handling); // Used to prevent raceconditions during realloc timeout G_LOCK_DEFINE (realloc_timeout); @@ -86,6 +87,30 @@ struct kiro_client_connection { }; +G_LOCK_DEFINE (send_lock); + +static inline gboolean +send_msg (struct rdma_cm_id *id, struct kiro_rdma_mem *r) +{ + gboolean retval = TRUE; + G_LOCK (send_lock); + g_debug ("Sending message"); + if (rdma_post_send (id, id, r->mem, r->size, r->mr, IBV_SEND_SIGNALED)) { + retval = FALSE; + } + else { + struct ibv_wc wc; + if (rdma_get_send_comp (id, &wc) < 0) { + retval = FALSE; + } + g_debug ("WC Status: %i", wc.status); + } + + G_UNLOCK (send_lock); + return retval; +} + + KiroServer * kiro_server_new (void) { @@ -210,23 +235,14 @@ grant_client_access (struct rdma_cm_id *client, void *mem, size_t mem_size, guin struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem); msg->msg_type = type; - msg->peer_mri = * (ctx->rdma_mr->mr); - if (rdma_post_send (client, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { + if (!send_msg (client, ctx->cf_mr_send)) { g_warning ("Failure while trying to post SEND: %s", strerror (errno)); kiro_destroy_rdma_memory (ctx->rdma_mr); return -1; } - struct ibv_wc wc; - - if (rdma_get_send_comp (client, &wc) < 0) { - g_warning ("Failed to post RDMA MRI to client: %s", strerror (errno)); - kiro_destroy_rdma_memory (ctx->rdma_mr); - return -1; - } - g_debug ("RDMA MRI sent to client"); return 0; } @@ -235,19 +251,30 @@ grant_client_access (struct rdma_cm_id *client, void *mem, size_t mem_size, guin static gboolean process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) { - // Right now, we don't need 'source' and 'condition' - // Tell the compiler to ignore them by (void)-ing them + // Right now, we don't need 'source' + // Tell the compiler to ignore it by (void)-ing it (void) source; - //(void) condition; - g_debug ("Message condition: %i", condition); - struct kiro_client_connection *cc = (struct kiro_client_connection *)data; + if (!G_TRYLOCK (rdma_handling)) { + g_debug ("RDMA handling will wait for the next dispatch."); + return TRUE; + } + + g_debug ("Got message on condition: %i", condition); + void *payload = ((GList *)data)->data; + struct kiro_client_connection *cc = (struct kiro_client_connection *)payload; struct ibv_wc wc; - if (ibv_poll_cq (cc->conn->recv_cq, 1, &wc) < 0) { + gint num_comp = ibv_poll_cq (cc->conn->recv_cq, 1, &wc); + if (!num_comp) { + g_critical ("RDMA event handling was triggered, but there is no completion on the queue"); + goto end_rmda_eh; + } + if (num_comp < 0) { g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno)); - return FALSE; + goto end_rmda_eh; } + g_debug ("Got %i receive events from the queue", num_comp); void *cq_ctx; struct ibv_cq *cq; int err = ibv_get_cq_event (cc->conn->recv_cq_channel, &cq, &cq_ctx); @@ -258,18 +285,38 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; g_debug ("Received a message from Client %u of type %u", cc->id, type); - if (type == KIRO_PING) { - struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem); - msg->msg_type = KIRO_PONG; + switch (type) { + case KIRO_PING: + { + struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem); + msg->msg_type = KIRO_PONG; - if (rdma_post_send (cc->conn, cc->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { - g_warning ("Failure while trying to post PONG send: %s", strerror (errno)); - goto done; + if (!send_msg (cc->conn, ctx->cf_mr_send)) { + g_warning ("Failure while trying to post PONG send: %s", strerror (errno)); + goto done; + } + break; } - - if (rdma_get_send_comp (cc->conn, &wc) < 0) { - g_warning ("An error occured while sending PONG: %s", strerror (errno)); + case KIRO_ACK_RDMA: + { + g_debug ("ACK received"); + if (G_TRYLOCK (realloc_timeout)) { + g_debug ("Client %i has ACKed the reallocation request", cc->id); + GList *client = g_list_find (realloc_list, (gpointer)cc); + if (client) { + realloc_list = g_list_remove_link (realloc_list, client); + if (cc->backup_mri->mr) + ibv_dereg_mr (cc->backup_mri->mr); + g_free (cc->backup_mri); + cc->backup_mri = NULL; + g_debug ("Client %i removed from realloc_list", cc->id); + } + G_UNLOCK (realloc_timeout); + } + break; } + default: + g_debug ("Message Type is unknow. Ignoring..."); } done: @@ -281,12 +328,15 @@ done: g_critical ("Posting generic receive for event handling failed: %s", strerror (errno)); kiro_destroy_connection_context (&ctx); rdma_destroy_ep (cc->conn); - return FALSE; + goto end_rmda_eh; } ibv_req_notify_cq (cc->conn->recv_cq, 0); // Make the respective Queue push events onto the channel g_debug ("Finished RDMA event handling"); + +end_rmda_eh: + G_UNLOCK (rdma_handling); return TRUE; } @@ -299,9 +349,11 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) (void) source; (void) condition; + g_debug ("CM event handler triggered"); if (!G_TRYLOCK (connection_handling)) { // Unsafe to handle connection management right now. // Wait for next dispatch. + g_debug ("Connection handling is busy. Waiting for next dispatch"); return TRUE; } @@ -359,10 +411,16 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) cc->id = ctx->identifier; cc->conn = ev->id; cc->rcv_ec = g_io_channel_unix_new (ev->id->recv_cq_channel->fd); - cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)cc); + priv->clients = g_list_append (priv->clients, (gpointer)cc); + GList *client = g_list_find (priv->clients, (gpointer)cc); + if (!client->data || client->data != cc) { + g_critical ("Could not add client to list"); + goto fail; + } + + cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)client); g_io_channel_unref (cc->rcv_ec); // main_loop now holds a reference. We don't need ours any more - priv->clients = g_list_append (priv->clients, (gpointer)cc); g_debug ("Client connection assigned with ID %u", ctx->identifier); g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients)); break; @@ -412,6 +470,7 @@ exit: } G_UNLOCK (connection_handling); + g_debug ("CM event handling done"); return TRUE; } @@ -549,30 +608,6 @@ disconnect_client (gpointer data, gpointer user_data) } -/* - * NOTE: - * When sending the reconnection request to the clients, we try to copy all the - * clients from the current pirv->clients list to the new realloc_clients list. - * We will then remove eache ACKed client from the _OLD_ list, and will use that - * list as well to determine clients that have not responded in time. - * Afterwards, we will swap the old and the new list pointers to restore the - * correct list naming. - * - * This is a trick to circumvent problems with clients that, for whatever - * reason, can not be copied to the 'new' list. If that happens, we would not - * even recognize that a client was not informed, because it never appears in - * the new list to begin with, and the client would therefore survive the - * timeout for reallocation. That clients peer_mri would then never be unpinned - * (unless the server stops or the client disconnects), causing MASSIVE memory - * leakage. Also, the client would continue to read stale data in the best case, - * or newly allocated garbage in the worst case. - * - * Since all currently connected clients are guaranteed to be stored in the old - * list, using that one to detect failed ACKs makes much more sense, since - * clients that failed to be informed of reallocation would simply never send an - * ACK, stay on the list, and then securely be disconnected after the timeout. - **/ - void request_client_realloc (gpointer data, gpointer user_data) { @@ -580,11 +615,11 @@ request_client_realloc (gpointer data, gpointer user_data) { g_critical ("Either client or server pointer was lost during client reconnect!"); // The client will remain in the old list, never receive the // reallocation request, therefore never send an ACK and therefore will - // be forcefully disconnected once the timeout happens. See Note above. + // be forcefully disconnected once the timeout happens. return; } - realloc_clients = g_list_append (realloc_clients, data); + realloc_list = g_list_append (realloc_list, data); struct kiro_client_connection *cc = (struct kiro_client_connection *)data; struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context; @@ -600,6 +635,7 @@ request_client_realloc (gpointer data, gpointer user_data) { ctx->rdma_mr = cc->backup_mri; cc->backup_mri = NULL; g_warning ("Failed to request REALLOC for client %i", cc->id); + G_UNLOCK (rdma_handling); return; } g_debug ("Client %i REALLOC request sent.", cc->id); @@ -618,55 +654,103 @@ client_realloc_timeout (gpointer data) { } +/* + * NOTE: + * Since all currently connected clients are guaranteed to be stored in the + * priv->clients list, using that one to detect failed ACKs makes much more + * sense, since clients that failed to be informed of reallocation would simply + * never send an ACK, stay on the list, and then securely be disconnected after + * the timeout. + * + * Therefore, we first try to send the REALLOC request to the clients, then try + * to copy them to the realloc_list. Once this is done, the realloc_list will only + * contain the list of clients that are guaranteed to have received the REALLOC + * request. We then swap the realloc_list and priv->clients list. The + * realloc_list then contains all of the clients that were previously + * connected, before we started to send out REALLOC requests. This makes it easy + * for us to ensure, that we don't 'forget' any client during timeout check. + * + * This is a trick to circumvent problems with clients that, for whatever + * reason, could not be copied to the 'new' list. If that happens, we would not + * even recognize that a client was not informed, because it never appears in + * the new list to begin with, and the client would therefore survive the + * timeout for reallocation. That clients peer_mri would then never be unpinned + * (unless the server stops or the client disconnects), causing MASSIVE memory + * leakage. Also, the client would continue to read stale data in the best case, + * or newly allocated garbage in the worst case. + **/ + void kiro_server_realloc (KiroServer *self, void *mem, size_t size) { if (!self) return; + g_debug ("Starting realloc"); + struct kiro_rdma_mem rdma_mem; rdma_mem.mem = mem; rdma_mem.size = size; KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE (self); + G_LOCK (connection_handling); + G_LOCK (rdma_handling); + + priv->mem = mem; + priv->mem_size = size; + if (!priv->clients) { + g_debug ("No clients to reconnect. Done."); + G_UNLOCK (rdma_handling); + G_UNLOCK (connection_handling); + return; + } g_list_foreach (priv->clients, request_client_realloc, &rdma_mem); + // Swap the two lists. See Note above. + GList *tmp = priv->clients; + priv->clients = realloc_list; + realloc_list = tmp; + G_UNLOCK (rdma_handling); + guint timeout = g_timeout_add_seconds (2, client_realloc_timeout, NULL); timeout_done = FALSE; - while (!timeout_done) {}; + while (!timeout_done) { + if (!realloc_list) { + g_debug ("All clients have ACKed"); + break; // all clients ACKed + } + } - // Remove the timeout GSource *timeout_source = g_main_context_find_source_by_id (NULL, timeout); if (timeout_source) { g_source_destroy (timeout_source); } G_LOCK (realloc_timeout); - GList *current = g_list_first (priv->clients); + GList *current = g_list_first (realloc_list); while (current) { - GList *client = g_list_find (realloc_clients, current->data); + struct kiro_client_connection *cc = (struct kiro_client_connection *)current->data; + g_debug ("Client %i did not ACK the REALLOC request in time.", cc->id); + GList *client = g_list_find (priv->clients, current->data); if (client) { - struct kiro_client_connection *cc = (struct kiro_client_connection *)client->data; - g_debug ("Client %i did not ACK the REALLOC request in time.", cc->id); - disconnect_client (client->data, NULL); - realloc_clients = g_list_delete_link (realloc_clients, client); + priv->clients = g_list_delete_link (priv->clients, client); } + disconnect_client (current->data, NULL); current = g_list_next (current); } - g_list_free (priv->clients); - priv->clients = realloc_clients; - realloc_clients = NULL; + + if (realloc_list) { + g_list_free (realloc_list); + realloc_list = NULL; + } G_UNLOCK (realloc_timeout); - // CHANGE INTERNAL POINTERS FOR MEM!! - // priv->mem = mem, etc.. + g_debug ("Realloc procedure done!"); G_UNLOCK (connection_handling); - - } -- cgit v1.2.3