summaryrefslogtreecommitdiffstats
path: root/src/kiro-server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/kiro-server.c')
-rw-r--r--src/kiro-server.c226
1 files changed, 155 insertions, 71 deletions
diff --git a/src/kiro-server.c b/src/kiro-server.c
index 0d62ae7..f8dd9d3 100644
--- a/src/kiro-server.c
+++ b/src/kiro-server.c
@@ -68,10 +68,11 @@ G_DEFINE_TYPE (KiroServer, kiro_server, G_TYPE_OBJECT);
// List of clients that were asked to realloc their memory
-GList *realloc_clients;
+GList *realloc_list;
// Temporary lock for connecting clients
G_LOCK_DEFINE (connection_handling);
+G_LOCK_DEFINE (rdma_handling);
// Used to prevent raceconditions during realloc timeout
G_LOCK_DEFINE (realloc_timeout);
@@ -86,6 +87,30 @@ struct kiro_client_connection {
};
+G_LOCK_DEFINE (send_lock);
+
+static inline gboolean
+send_msg (struct rdma_cm_id *id, struct kiro_rdma_mem *r)
+{
+ gboolean retval = TRUE;
+ G_LOCK (send_lock);
+ g_debug ("Sending message");
+ if (rdma_post_send (id, id, r->mem, r->size, r->mr, IBV_SEND_SIGNALED)) {
+ retval = FALSE;
+ }
+ else {
+ struct ibv_wc wc;
+ if (rdma_get_send_comp (id, &wc) < 0) {
+ retval = FALSE;
+ }
+ g_debug ("WC Status: %i", wc.status);
+ }
+
+ G_UNLOCK (send_lock);
+ return retval;
+}
+
+
KiroServer *
kiro_server_new (void)
{
@@ -210,23 +235,14 @@ grant_client_access (struct rdma_cm_id *client, void *mem, size_t mem_size, guin
struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem);
msg->msg_type = type;
-
msg->peer_mri = * (ctx->rdma_mr->mr);
- if (rdma_post_send (client, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) {
+ if (!send_msg (client, ctx->cf_mr_send)) {
g_warning ("Failure while trying to post SEND: %s", strerror (errno));
kiro_destroy_rdma_memory (ctx->rdma_mr);
return -1;
}
- struct ibv_wc wc;
-
- if (rdma_get_send_comp (client, &wc) < 0) {
- g_warning ("Failed to post RDMA MRI to client: %s", strerror (errno));
- kiro_destroy_rdma_memory (ctx->rdma_mr);
- return -1;
- }
-
g_debug ("RDMA MRI sent to client");
return 0;
}
@@ -235,19 +251,30 @@ grant_client_access (struct rdma_cm_id *client, void *mem, size_t mem_size, guin
static gboolean
process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
{
- // Right now, we don't need 'source' and 'condition'
- // Tell the compiler to ignore them by (void)-ing them
+ // Right now, we don't need 'source'
+ // Tell the compiler to ignore it by (void)-ing it
(void) source;
- //(void) condition;
- g_debug ("Message condition: %i", condition);
- struct kiro_client_connection *cc = (struct kiro_client_connection *)data;
+ if (!G_TRYLOCK (rdma_handling)) {
+ g_debug ("RDMA handling will wait for the next dispatch.");
+ return TRUE;
+ }
+
+ g_debug ("Got message on condition: %i", condition);
+ void *payload = ((GList *)data)->data;
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)payload;
struct ibv_wc wc;
- if (ibv_poll_cq (cc->conn->recv_cq, 1, &wc) < 0) {
+ gint num_comp = ibv_poll_cq (cc->conn->recv_cq, 1, &wc);
+ if (!num_comp) {
+ g_critical ("RDMA event handling was triggered, but there is no completion on the queue");
+ goto end_rmda_eh;
+ }
+ if (num_comp < 0) {
g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno));
- return FALSE;
+ goto end_rmda_eh;
}
+ g_debug ("Got %i receive events from the queue", num_comp);
void *cq_ctx;
struct ibv_cq *cq;
int err = ibv_get_cq_event (cc->conn->recv_cq_channel, &cq, &cq_ctx);
@@ -258,18 +285,38 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type;
g_debug ("Received a message from Client %u of type %u", cc->id, type);
- if (type == KIRO_PING) {
- struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem);
- msg->msg_type = KIRO_PONG;
+ switch (type) {
+ case KIRO_PING:
+ {
+ struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem);
+ msg->msg_type = KIRO_PONG;
- if (rdma_post_send (cc->conn, cc->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) {
- g_warning ("Failure while trying to post PONG send: %s", strerror (errno));
- goto done;
+ if (!send_msg (cc->conn, ctx->cf_mr_send)) {
+ g_warning ("Failure while trying to post PONG send: %s", strerror (errno));
+ goto done;
+ }
+ break;
}
-
- if (rdma_get_send_comp (cc->conn, &wc) < 0) {
- g_warning ("An error occured while sending PONG: %s", strerror (errno));
+ case KIRO_ACK_RDMA:
+ {
+ g_debug ("ACK received");
+ if (G_TRYLOCK (realloc_timeout)) {
+ g_debug ("Client %i has ACKed the reallocation request", cc->id);
+ GList *client = g_list_find (realloc_list, (gpointer)cc);
+ if (client) {
+ realloc_list = g_list_remove_link (realloc_list, client);
+ if (cc->backup_mri->mr)
+ ibv_dereg_mr (cc->backup_mri->mr);
+ g_free (cc->backup_mri);
+ cc->backup_mri = NULL;
+ g_debug ("Client %i removed from realloc_list", cc->id);
+ }
+ G_UNLOCK (realloc_timeout);
+ }
+ break;
}
+ default:
+ g_debug ("Message Type is unknow. Ignoring...");
}
done:
@@ -281,12 +328,15 @@ done:
g_critical ("Posting generic receive for event handling failed: %s", strerror (errno));
kiro_destroy_connection_context (&ctx);
rdma_destroy_ep (cc->conn);
- return FALSE;
+ goto end_rmda_eh;
}
ibv_req_notify_cq (cc->conn->recv_cq, 0); // Make the respective Queue push events onto the channel
g_debug ("Finished RDMA event handling");
+
+end_rmda_eh:
+ G_UNLOCK (rdma_handling);
return TRUE;
}
@@ -299,9 +349,11 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
(void) source;
(void) condition;
+ g_debug ("CM event handler triggered");
if (!G_TRYLOCK (connection_handling)) {
// Unsafe to handle connection management right now.
// Wait for next dispatch.
+ g_debug ("Connection handling is busy. Waiting for next dispatch");
return TRUE;
}
@@ -359,10 +411,16 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
cc->id = ctx->identifier;
cc->conn = ev->id;
cc->rcv_ec = g_io_channel_unix_new (ev->id->recv_cq_channel->fd);
- cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)cc);
+ priv->clients = g_list_append (priv->clients, (gpointer)cc);
+ GList *client = g_list_find (priv->clients, (gpointer)cc);
+ if (!client->data || client->data != cc) {
+ g_critical ("Could not add client to list");
+ goto fail;
+ }
+
+ cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)client);
g_io_channel_unref (cc->rcv_ec); // main_loop now holds a reference. We don't need ours any more
- priv->clients = g_list_append (priv->clients, (gpointer)cc);
g_debug ("Client connection assigned with ID %u", ctx->identifier);
g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients));
break;
@@ -412,6 +470,7 @@ exit:
}
G_UNLOCK (connection_handling);
+ g_debug ("CM event handling done");
return TRUE;
}
@@ -549,30 +608,6 @@ disconnect_client (gpointer data, gpointer user_data)
}
-/*
- * NOTE:
- * When sending the reconnection request to the clients, we try to copy all the
- * clients from the current pirv->clients list to the new realloc_clients list.
- * We will then remove eache ACKed client from the _OLD_ list, and will use that
- * list as well to determine clients that have not responded in time.
- * Afterwards, we will swap the old and the new list pointers to restore the
- * correct list naming.
- *
- * This is a trick to circumvent problems with clients that, for whatever
- * reason, can not be copied to the 'new' list. If that happens, we would not
- * even recognize that a client was not informed, because it never appears in
- * the new list to begin with, and the client would therefore survive the
- * timeout for reallocation. That clients peer_mri would then never be unpinned
- * (unless the server stops or the client disconnects), causing MASSIVE memory
- * leakage. Also, the client would continue to read stale data in the best case,
- * or newly allocated garbage in the worst case.
- *
- * Since all currently connected clients are guaranteed to be stored in the old
- * list, using that one to detect failed ACKs makes much more sense, since
- * clients that failed to be informed of reallocation would simply never send an
- * ACK, stay on the list, and then securely be disconnected after the timeout.
- **/
-
void
request_client_realloc (gpointer data, gpointer user_data) {
@@ -580,11 +615,11 @@ request_client_realloc (gpointer data, gpointer user_data) {
g_critical ("Either client or server pointer was lost during client reconnect!");
// The client will remain in the old list, never receive the
// reallocation request, therefore never send an ACK and therefore will
- // be forcefully disconnected once the timeout happens. See Note above.
+ // be forcefully disconnected once the timeout happens.
return;
}
- realloc_clients = g_list_append (realloc_clients, data);
+ realloc_list = g_list_append (realloc_list, data);
struct kiro_client_connection *cc = (struct kiro_client_connection *)data;
struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context;
@@ -600,6 +635,7 @@ request_client_realloc (gpointer data, gpointer user_data) {
ctx->rdma_mr = cc->backup_mri;
cc->backup_mri = NULL;
g_warning ("Failed to request REALLOC for client %i", cc->id);
+ G_UNLOCK (rdma_handling);
return;
}
g_debug ("Client %i REALLOC request sent.", cc->id);
@@ -618,55 +654,103 @@ client_realloc_timeout (gpointer data) {
}
+/*
+ * NOTE:
+ * Since all currently connected clients are guaranteed to be stored in the
+ * priv->clients list, using that one to detect failed ACKs makes much more
+ * sense, since clients that failed to be informed of reallocation would simply
+ * never send an ACK, stay on the list, and then securely be disconnected after
+ * the timeout.
+ *
+ * Therefore, we first try to send the REALLOC request to the clients, then try
+ * to copy them to the realloc_list. Once this is done, the realloc_list will only
+ * contain the list of clients that are guaranteed to have received the REALLOC
+ * request. We then swap the realloc_list and priv->clients list. The
+ * realloc_list then contains all of the clients that were previously
+ * connected, before we started to send out REALLOC requests. This makes it easy
+ * for us to ensure, that we don't 'forget' any client during timeout check.
+ *
+ * This is a trick to circumvent problems with clients that, for whatever
+ * reason, could not be copied to the 'new' list. If that happens, we would not
+ * even recognize that a client was not informed, because it never appears in
+ * the new list to begin with, and the client would therefore survive the
+ * timeout for reallocation. That clients peer_mri would then never be unpinned
+ * (unless the server stops or the client disconnects), causing MASSIVE memory
+ * leakage. Also, the client would continue to read stale data in the best case,
+ * or newly allocated garbage in the worst case.
+ **/
+
void
kiro_server_realloc (KiroServer *self, void *mem, size_t size) {
if (!self)
return;
+ g_debug ("Starting realloc");
+
struct kiro_rdma_mem rdma_mem;
rdma_mem.mem = mem;
rdma_mem.size = size;
KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE (self);
+
G_LOCK (connection_handling);
+ G_LOCK (rdma_handling);
+
+ priv->mem = mem;
+ priv->mem_size = size;
+ if (!priv->clients) {
+ g_debug ("No clients to reconnect. Done.");
+ G_UNLOCK (rdma_handling);
+ G_UNLOCK (connection_handling);
+ return;
+ }
g_list_foreach (priv->clients, request_client_realloc, &rdma_mem);
+ // Swap the two lists. See Note above.
+ GList *tmp = priv->clients;
+ priv->clients = realloc_list;
+ realloc_list = tmp;
+ G_UNLOCK (rdma_handling);
+
guint timeout = g_timeout_add_seconds (2, client_realloc_timeout, NULL);
timeout_done = FALSE;
- while (!timeout_done) {};
+ while (!timeout_done) {
+ if (!realloc_list) {
+ g_debug ("All clients have ACKed");
+ break; // all clients ACKed
+ }
+ }
- // Remove the timeout
GSource *timeout_source = g_main_context_find_source_by_id (NULL, timeout);
if (timeout_source) {
g_source_destroy (timeout_source);
}
G_LOCK (realloc_timeout);
- GList *current = g_list_first (priv->clients);
+ GList *current = g_list_first (realloc_list);
while (current) {
- GList *client = g_list_find (realloc_clients, current->data);
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)current->data;
+ g_debug ("Client %i did not ACK the REALLOC request in time.", cc->id);
+ GList *client = g_list_find (priv->clients, current->data);
if (client) {
- struct kiro_client_connection *cc = (struct kiro_client_connection *)client->data;
- g_debug ("Client %i did not ACK the REALLOC request in time.", cc->id);
- disconnect_client (client->data, NULL);
- realloc_clients = g_list_delete_link (realloc_clients, client);
+ priv->clients = g_list_delete_link (priv->clients, client);
}
+ disconnect_client (current->data, NULL);
current = g_list_next (current);
}
- g_list_free (priv->clients);
- priv->clients = realloc_clients;
- realloc_clients = NULL;
+
+ if (realloc_list) {
+ g_list_free (realloc_list);
+ realloc_list = NULL;
+ }
G_UNLOCK (realloc_timeout);
- // CHANGE INTERNAL POINTERS FOR MEM!!
- // priv->mem = mem, etc..
+ g_debug ("Realloc procedure done!");
G_UNLOCK (connection_handling);
-
-
}