summaryrefslogtreecommitdiffstats
path: root/kiro-server.c
diff options
context:
space:
mode:
Diffstat (limited to 'kiro-server.c')
-rw-r--r--kiro-server.c163
1 files changed, 107 insertions, 56 deletions
diff --git a/kiro-server.c b/kiro-server.c
index c017b07..52304c8 100644
--- a/kiro-server.c
+++ b/kiro-server.c
@@ -50,9 +50,13 @@ struct _KiroServerPrivate {
/* 'Real' private structures */
/* (Not accessible by properties) */
- struct rdma_event_channel *ec; // Main Event Channel
- struct rdma_cm_id *base; // Base-Listening-Connection
- struct kiro_connection *client; // Connection to the client
+ struct rdma_event_channel *ec; // Main Event Channel
+ struct rdma_cm_id *base; // Base-Listening-Connection
+ struct kiro_connection *client; // Connection to the client
+ pthread_t event_listener; // Pointer to the completion-listener thread of this connection
+ pthread_mutex_t mtx; // Mutex to signal the listener-thread termination
+ void *mem; // Pointer to the server buffer
+ size_t mem_size; // Server Buffer Size in bytes
};
@@ -82,13 +86,23 @@ kiro_server_class_init (KiroServerClass *klass)
}
-static int connect_client (struct kiro_connection *client)
+static int connect_client (struct rdma_cm_id *client)
{
+ if(!client)
+ return -1;
+
+ if( -1 == kiro_attach_qp(client))
+ {
+ printf("Could not create a QP for the new connection.\n");
+ rdma_destroy_id(client);
+ return -1;
+ }
+
struct kiro_connection_context *ctx = (struct kiro_connection_context *)calloc(1,sizeof(struct kiro_connection_context));
if(!ctx)
{
printf("Failed to create connection context.\n");
- rdma_destroy_id(client->id);
+ rdma_destroy_id(client);
return -1;
}
@@ -100,25 +114,25 @@ static int connect_client (struct kiro_connection *client)
goto error;
}
- ctx->cf_mr_recv = kiro_create_rdma_memory(client->id->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE);
- ctx->cf_mr_send = kiro_create_rdma_memory(client->id->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE);
+ ctx->cf_mr_recv = kiro_create_rdma_memory(client->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE);
+ ctx->cf_mr_send = kiro_create_rdma_memory(client->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE);
if(!ctx->cf_mr_recv || !ctx->cf_mr_send)
{
printf("Failed to register control message memory.\n");
goto error;
}
ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof(struct kiro_ctrl_msg);
- client->id->context = ctx;
+ client->context = ctx;
- if(rdma_post_recv(client->id, client, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr))
+ if(rdma_post_recv(client, client, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr))
{
printf("Posting preemtive receive for connection failed.\n");
goto error;
}
- if(rdma_accept(client->id, NULL))
+ if(rdma_accept(client, NULL))
{
- printf("Failed to establish connection to the server.\n");
+ printf("Failed to establish connection to the client with error: %i.\n", errno);
goto error;
}
printf("Client Connected.\n");
@@ -126,16 +140,16 @@ static int connect_client (struct kiro_connection *client)
error:
- rdma_reject(client->id, NULL, 0);
+ rdma_reject(client, NULL, 0);
kiro_destroy_connection_context(&ctx);
- rdma_destroy_id(client->id);
+ rdma_destroy_id(client);
return -1;
}
-static int welcome_client (struct kiro_connection *client, void *mem, size_t mem_size)
+static int welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size)
{
- struct kiro_connection_context *ctx = (struct kiro_connection_context *)(client->id->context);
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *)(client->context);
ctx->rdma_mr = (struct kiro_rdma_mem *)calloc(1, sizeof(struct kiro_rdma_mem));
if(!ctx->rdma_mr)
{
@@ -145,7 +159,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem
ctx->rdma_mr->mem = mem;
ctx->rdma_mr->size = mem_size;
- ctx->rdma_mr->mr = rdma_reg_read(client->id, ctx->rdma_mr->mem, ctx->rdma_mr->size);
+ ctx->rdma_mr->mr = rdma_reg_read(client, ctx->rdma_mr->mem, ctx->rdma_mr->size);
if(!ctx->rdma_mr->mr)
{
printf("Failed to register RDMA Memory Region.\n");
@@ -157,7 +171,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem
msg->msg_type = KIRO_ACK_RDMA;
msg->peer_mri = *(ctx->rdma_mr->mr);
- if(rdma_post_send(client->id, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED))
+ if(rdma_post_send(client, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED))
{
printf("Failure while trying to post SEND.\n");
kiro_destroy_rdma_memory(ctx->rdma_mr);
@@ -166,7 +180,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem
struct ibv_wc wc;
- if(rdma_get_send_comp(client->id, &wc) < 0)
+ if(rdma_get_send_comp(client, &wc) < 0)
{
printf("Failed to post RDMA MRI to client.\n");
kiro_destroy_rdma_memory(ctx->rdma_mr);
@@ -178,6 +192,71 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem
}
+void * event_loop (void *self)
+{
+ KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE((KiroServer *)self);
+ struct rdma_cm_event *active_event;
+
+ int stop = 0;
+
+ while(0 == stop) {
+ if(0 <= rdma_get_cm_event(priv->ec, &active_event))
+ {
+
+ struct rdma_cm_event *ev = malloc(sizeof(*active_event));
+ if(!ev)
+ {
+ printf("Unable to allocate memory for Event handling!\n");
+ rdma_ack_cm_event(active_event);
+ continue;
+ }
+ memcpy(ev, active_event, sizeof(*active_event));
+ rdma_ack_cm_event(active_event);
+
+ if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
+ {
+
+ /*
+ priv->client = (struct kiro_connection *)calloc(1, sizeof(struct kiro_connection));
+ if(!(priv->client))
+ {
+ printf("Failed to create container for client connection.\n");
+ free(ev);
+ continue;
+ }
+ priv->client->identifier = 0; //First Client
+ priv->client->id = ev->id;
+ */
+
+ if(0 == connect_client(ev->id))
+ {
+ // Connection set-up successfully! (Server)
+ // Post a welcoming "Recieve" for handshaking
+ welcome_client(ev->id, priv->mem, priv->mem_size);
+ }
+ }
+ else if(ev->event == RDMA_CM_EVENT_DISCONNECTED)
+ {
+ printf("Got disconnect request.\n");
+ //pthread_mutex_unlock(&(priv->mtx));
+ kiro_destroy_connection(&(ev->id));
+ printf("Connection closed successfully\n");
+ }
+ free(ev);
+ }
+
+ // Mutex will be freed as a signal to stop request
+ if(0 == pthread_mutex_trylock(&(priv->mtx)))
+ stop = 1;
+ }
+
+ printf("Closing Event Listener Thread\n");
+ return NULL;
+}
+
+
+
+
int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, size_t mem_size)
{
KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE(self);
@@ -245,43 +324,10 @@ int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, s
rdma_destroy_ep(priv->base);
return -1;
}
- printf("Enpoint listening.\n");
-
-
- priv->client = (struct kiro_connection *)calloc(1, sizeof(struct kiro_connection));
- if(!(priv->client))
- {
- printf("Failed to create container for client connection.\n");
- return -1;
- }
- priv->client->identifier = 0; //First Client
-
- printf("Waiting for connection request.\n");
- if(rdma_get_request(priv->base, &(priv->client->id)))
- {
- printf("Failure waiting for clienet connection.\n");
- rdma_destroy_ep(priv->base);
- return -1;
- }
- printf("Connection Request received.\n");
-
-
- if(connect_client(priv->client))
- {
- printf("Client connection failed!\n");
- rdma_destroy_ep(priv->base);
- free(priv->client);
- return -1;
- }
-
- if(welcome_client(priv->client, mem, mem_size))
- {
- printf("Failed to setup client communication.\n");
- kiro_destroy_connection(&(priv->client));
- rdma_destroy_id(priv->base);
- return -1;
- }
+ priv->mem = mem;
+ priv->mem_size = mem_size;
+
priv->ec = rdma_create_event_channel();
int oldflags = fcntl (priv->ec->fd, F_GETFL, 0);
/* Only change the FD Mode if we were able to get its flags */
@@ -293,10 +339,15 @@ int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, s
if(rdma_migrate_id(priv->base, priv->ec))
{
printf("Was unable to migrate connection to new Event Channel.\n");
- kiro_destroy_connection(&(priv->client));
- rdma_destroy_id(priv->base);
+ rdma_destroy_ep(priv->base);
return -1;
}
+
+ pthread_mutex_init(&(priv->mtx), NULL);
+ pthread_mutex_lock(&(priv->mtx));
+ pthread_create(&(priv->event_listener), NULL, event_loop, self);
+
+ printf("Enpoint listening.\n");
sleep(1);
return 0;