Browse Source

ktp: Callbacks for some events

Serj Kalichev 1 year ago
parent
commit
72e5ecd7ac
4 changed files with 192 additions and 59 deletions
  1. 54 0
      bin/klish/interactive.c
  2. 2 2
      bin/klish/klish.c
  3. 106 52
      klish/ktp/ktp_session.c
  4. 30 5
      klish/ktp_session.h

+ 54 - 0
bin/klish/interactive.c

@@ -1,16 +1,70 @@
 #include <stdlib.h>
 #include <stdio.h>
+#include <assert.h>
+#include <unistd.h>
 
+#include <faux/faux.h>
+#include <faux/eloop.h>
 #include <klish/ktp.h>
 #include <klish/ktp_session.h>
 
 
+// Context for main loop
+typedef struct ctx_s {
+	ktp_session_t *ktp;
+} ctx_t;
+
+
+bool_t cmd_ack_cb(ktp_session_t *ktp, const faux_msg_t *msg, void *udata);
+static bool_t stdin_cb(faux_eloop_t *eloop, faux_eloop_type_e type,
+	void *associated_data, void *user_data);
+
+
 int klish_interactive_shell(ktp_session_t *ktp)
 {
+	ctx_t ctx = {};
+	faux_eloop_t *eloop = NULL;
+
+	assert(ktp);
+	if (!ktp)
+		return -1;
 
+	// Don't stop interactive loop on each answer
+	ktp_session_set_stop_on_answer(ktp, BOOL_FALSE);
 
+	ctx.ktp = ktp;
 
+	ktp_session_set_cb(ktp, KTP_SESSION_CB_CMD_ACK, cmd_ack_cb, &ctx);
+	eloop = ktp_session_eloop(ktp);
+	faux_eloop_add_fd(eloop, STDIN_FILENO, POLLIN, stdin_cb, &ctx);
+	faux_eloop_loop(eloop);
 
 	return 0;
 }
 
+
+bool_t cmd_ack_cb(ktp_session_t *ktp, const faux_msg_t *msg, void *udata)
+{
+	printf("CMD ACK CB\n");
+
+	// Happy compiler
+	ktp = ktp;
+	msg = msg;
+	udata = udata;
+
+	ktp_session_set_done(ktp, BOOL_TRUE);
+
+	return BOOL_TRUE;
+}
+
+
+static bool_t stdin_cb(faux_eloop_t *eloop, faux_eloop_type_e type,
+	void *associated_data, void *udata)
+{
+	ctx_t *ctx = (ctx_t *)udata;
+
+	ktp_session_cmd(ctx->ktp, "cmd", NULL, BOOL_FALSE);
+
+	return BOOL_TRUE;
+}
+

+ 2 - 2
bin/klish/klish.c

@@ -73,8 +73,8 @@ int main(int argc, char **argv)
 		fprintf(stderr, "Error: Can't create klish session\n");
 		goto err;
 	}
-	ktp_session_set_stdout_cb(ktp, stdout_cb, NULL);
-	ktp_session_set_stderr_cb(ktp, stderr_cb, NULL);
+	ktp_session_set_cb(ktp, KTP_SESSION_CB_STDOUT, stdout_cb, NULL);
+	ktp_session_set_cb(ktp, KTP_SESSION_CB_STDERR, stderr_cb, NULL);
 
 	// Commands from cmdline
 	if (faux_list_len(opts->commands) > 0) {

+ 106 - 52
klish/ktp/ktp_session.c

@@ -15,14 +15,10 @@
 #include <klish/ktp_session.h>
 
 
-typedef enum {
-	KTP_SESSION_STATE_DISCONNECTED = 'd',
-	KTP_SESSION_STATE_UNAUTHORIZED = 'a',
-	KTP_SESSION_STATE_IDLE = 'i',
-	KTP_SESSION_STATE_WAIT_FOR_COMPLETION = 'v',
-	KTP_SESSION_STATE_WAIT_FOR_HELP = 'h',
-	KTP_SESSION_STATE_WAIT_FOR_CMD = 'c',
-} ktp_session_state_e;
+typedef struct cb_s {
+	void *fn;
+	void *udata;
+} cb_t;
 
 
 struct ktp_session_s {
@@ -31,16 +27,14 @@ struct ktp_session_s {
 	faux_hdr_t *hdr; // Service var: engine will receive header and then msg
 	bool_t done;
 	faux_eloop_t *eloop; // External eloop object
-	ktp_session_stdout_cb_fn stdout_cb;
-	void *stdout_udata;
-	ktp_session_stdout_cb_fn stderr_cb;
-	void *stderr_udata;
+	cb_t cb[KTP_SESSION_CB_MAX];
 	faux_error_t *error; // Internal
 	bool_t request_done;
 	int cmd_retcode; // Internal
 	bool_t cmd_retcode_available;
 	ktp_status_e cmd_features;
 	bool_t cmd_features_available;
+	bool_t stop_on_answer; // Stop the loop when answer is received (for non-interactive mode)
 };
 
 
@@ -69,6 +63,13 @@ ktp_session_t *ktp_session_new(int sock, faux_eloop_t *eloop)
 	ktp->state = KTP_SESSION_STATE_IDLE;
 	ktp->done = BOOL_FALSE;
 	ktp->eloop = eloop;
+	ktp->stop_on_answer = BOOL_TRUE; // Non-interactive by default
+	ktp->error = NULL;
+	ktp->cmd_retcode = -1;
+	ktp->cmd_retcode_available = BOOL_FALSE;
+	ktp->request_done = BOOL_FALSE;
+	ktp->cmd_features = KTP_STATUS_NONE;
+	ktp->cmd_features_available = BOOL_FALSE;
 
 	// Async object
 	ktp->async = faux_async_new(sock);
@@ -85,17 +86,7 @@ ktp_session_t *ktp_session_new(int sock, faux_eloop_t *eloop)
 		server_ev, ktp);
 
 	// Callbacks
-	ktp->stdout_cb = NULL;
-	ktp->stdout_udata = NULL;
-	ktp->stderr_cb = NULL;
-	ktp->stderr_udata = NULL;
-
-	ktp->error = NULL;
-	ktp->cmd_retcode = -1;
-	ktp->cmd_retcode_available = BOOL_FALSE;
-	ktp->request_done = BOOL_FALSE;
-	ktp->cmd_features = KTP_STATUS_NONE;
-	ktp->cmd_features_available = BOOL_FALSE;
+	// Callbacks ktp->cb are zeroed by faux_zmalloc()
 
 	return ktp;
 }
@@ -147,49 +138,69 @@ bool_t ktp_session_set_done(ktp_session_t *ktp, bool_t done)
 }
 
 
-ktp_status_e ktp_session_cmd_features(const ktp_session_t *ktp)
+bool_t ktp_session_stop_on_answer(const ktp_session_t *ktp)
 {
 	assert(ktp);
 	if (!ktp)
-		return KTP_STATUS_NONE;
+		return BOOL_TRUE; // Default
 
-	return ktp->cmd_features;
+	return ktp->stop_on_answer;
 }
 
 
-faux_error_t *ktp_session_error(const ktp_session_t *ktp)
+bool_t ktp_session_set_stop_on_answer(ktp_session_t *ktp, bool_t stop_on_answer)
 {
 	assert(ktp);
 	if (!ktp)
 		return BOOL_FALSE;
 
-	return ktp->error;
+	ktp->stop_on_answer = stop_on_answer;
+
+	return BOOL_TRUE;
 }
 
 
-bool_t ktp_session_set_stdout_cb(ktp_session_t *ktp,
-	ktp_session_stdout_cb_fn stdout_cb, void *stdout_udata)
+ktp_session_state_e ktp_session_state(const ktp_session_t *ktp)
 {
 	assert(ktp);
 	if (!ktp)
-		return BOOL_FALSE;
+		return KTP_SESSION_STATE_ERROR;
+
+	return ktp->state;
+}
 
-	ktp->stdout_cb = stdout_cb;
-	ktp->stdout_udata = stdout_udata;
 
-	return BOOL_TRUE;
+ktp_status_e ktp_session_cmd_features(const ktp_session_t *ktp)
+{
+	assert(ktp);
+	if (!ktp)
+		return KTP_STATUS_NONE;
+
+	return ktp->cmd_features;
 }
 
 
-bool_t ktp_session_set_stderr_cb(ktp_session_t *ktp,
-	ktp_session_stdout_cb_fn stderr_cb, void *stderr_udata)
+faux_error_t *ktp_session_error(const ktp_session_t *ktp)
 {
 	assert(ktp);
 	if (!ktp)
 		return BOOL_FALSE;
 
-	ktp->stderr_cb = stderr_cb;
-	ktp->stderr_udata = stderr_udata;
+	return ktp->error;
+}
+
+
+bool_t ktp_session_set_cb(ktp_session_t *ktp, ktp_session_cb_e cb_id,
+	void *fn, void *udata)
+{
+	assert(ktp);
+	if (!ktp)
+		return BOOL_FALSE;
+	if (cb_id >= KTP_SESSION_CB_MAX)
+		return BOOL_FALSE;
+
+	ktp->cb[cb_id].fn = fn;
+	ktp->cb[cb_id].udata = udata;
 
 	return BOOL_TRUE;
 }
@@ -255,8 +266,10 @@ static bool_t server_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 
 	type = type; // Happy compiler
 
-	if (ktp->request_done)
+	if (ktp->request_done && ktp->stop_on_answer)
 		return BOOL_FALSE; // Stop event loop on receiving answer
+	if (ktp->done)
+		return BOOL_FALSE; // Stop event loop on done flag (exit)
 
 	return BOOL_TRUE;
 }
@@ -270,15 +283,14 @@ static bool_t ktp_session_process_stdout(ktp_session_t *ktp, const faux_msg_t *m
 	assert(ktp);
 	assert(msg);
 
-	if (!ktp->stdout_cb)
+	if (!ktp->cb[KTP_SESSION_CB_STDOUT].fn)
 		return BOOL_TRUE; // Just ignore stdout. It's not a bug
-	if (!ktp->cmd_features_available)
-		return BOOL_TRUE; // Drop message
 
 	if (!faux_msg_get_param_by_type(msg, KTP_PARAM_LINE, (void **)&line, &len))
 		return BOOL_TRUE; // It's strange but not a bug
 
-	return ktp->stdout_cb(ktp, line, len, ktp->stdout_udata);
+	return ((ktp_session_stdout_cb_fn)ktp->cb[KTP_SESSION_CB_STDOUT].fn)(
+		ktp, line, len, ktp->cb[KTP_SESSION_CB_STDOUT].udata);
 }
 
 
@@ -290,16 +302,15 @@ static bool_t ktp_session_process_stderr(ktp_session_t *ktp, const faux_msg_t *m
 	assert(ktp);
 	assert(msg);
 
-	if (!ktp->stderr_cb)
+	if (!ktp->cb[KTP_SESSION_CB_STDERR].fn)
 		return BOOL_TRUE; // Just ignore message. It's not a bug
-	if (!ktp->cmd_features_available)
-		return BOOL_TRUE; // Drop message
 
 	if (!faux_msg_get_param_by_type(msg, KTP_PARAM_LINE,
 			(void **)&line, &len))
 		return BOOL_TRUE; // It's strange but not a bug
 
-	return ktp->stderr_cb(ktp, line, len, ktp->stderr_udata);
+	return ((ktp_session_stdout_cb_fn)ktp->cb[KTP_SESSION_CB_STDERR].fn)(
+		ktp, line, len, ktp->cb[KTP_SESSION_CB_STDERR].udata);
 }
 
 
@@ -320,6 +331,13 @@ static bool_t ktp_session_process_cmd_ack(ktp_session_t *ktp, const faux_msg_t *
 			ktp->cmd_features_available = BOOL_TRUE;
 			ktp->cmd_features = status & KTP_STATUS_INTERACTIVE;
 		}
+		// Execute external callback
+		if (ktp->cb[KTP_SESSION_CB_CMD_ACK_INCOMPLETED].fn)
+			((ktp_session_event_cb_fn)
+				ktp->cb[KTP_SESSION_CB_CMD_ACK_INCOMPLETED].fn)(
+				ktp, msg,
+				ktp->cb[KTP_SESSION_CB_CMD_ACK_INCOMPLETED].udata);
+
 		return BOOL_TRUE;
 	}
 
@@ -332,9 +350,31 @@ static bool_t ktp_session_process_cmd_ack(ktp_session_t *ktp, const faux_msg_t *
 		faux_str_free(error_str);
 	}
 	ktp->cmd_retcode_available = BOOL_TRUE; // Answer from server was received
+	ktp->request_done = BOOL_TRUE;
 	ktp->state = KTP_SESSION_STATE_IDLE;
-	ktp->error = NULL;
-	ktp->request_done = BOOL_TRUE; // Stop the loop
+	// Execute external callback
+	if (ktp->cb[KTP_SESSION_CB_CMD_ACK].fn)
+		((ktp_session_event_cb_fn)
+			ktp->cb[KTP_SESSION_CB_CMD_ACK].fn)(
+			ktp, msg,
+			ktp->cb[KTP_SESSION_CB_CMD_ACK].udata);
+
+	return BOOL_TRUE;
+}
+
+
+static bool_t ktp_session_process_exit(ktp_session_t *ktp, const faux_msg_t *msg)
+{
+	assert(ktp);
+	assert(msg);
+
+	ktp_session_set_done(ktp, BOOL_TRUE);
+	// Execute external callback
+	if (ktp->cb[KTP_SESSION_CB_EXIT].fn)
+		((ktp_session_event_cb_fn)
+			ktp->cb[KTP_SESSION_CB_EXIT].fn)(
+			ktp, msg,
+			ktp->cb[KTP_SESSION_CB_EXIT].udata);
 
 	return BOOL_TRUE;
 }
@@ -355,15 +395,29 @@ static bool_t ktp_session_dispatch(ktp_session_t *ktp, faux_msg_t *msg)
 	cmd = faux_msg_get_cmd(msg);
 	switch (cmd) {
 	case KTP_CMD_ACK:
-		if (KTP_SESSION_STATE_WAIT_FOR_CMD == ktp->state)
-			rc = ktp_session_process_cmd_ack(ktp, msg);
+		if (ktp->state != KTP_SESSION_STATE_WAIT_FOR_CMD) {
+			syslog(LOG_WARNING, "Unexpected KTP_CMD_ACK was received\n");
+			break;
+		}
+		rc = ktp_session_process_cmd_ack(ktp, msg);
 		break;
 	case KTP_STDOUT:
+		if (ktp->state != KTP_SESSION_STATE_WAIT_FOR_CMD) {
+			syslog(LOG_WARNING, "Unexpected KTP_STDOUT was received\n");
+			break;
+		}
 		rc = ktp_session_process_stdout(ktp, msg);
 		break;
 	case KTP_STDERR:
+		if (ktp->state != KTP_SESSION_STATE_WAIT_FOR_CMD) {
+			syslog(LOG_WARNING, "Unexpected KTP_STDERR was received\n");
+			break;
+		}
 		rc = ktp_session_process_stderr(ktp, msg);
 		break;
+	case KTP_EXIT: // Async event
+		rc = ktp_session_process_exit(ktp, msg);
+		break;
 	default:
 		syslog(LOG_WARNING, "Unsupported command: 0x%04u\n", cmd); // Ignore
 		break;
@@ -465,7 +519,7 @@ static bool_t ktp_session_req(ktp_session_t *ktp, ktp_cmd_e cmd,
 	ktp->error = error;
 	ktp->cmd_retcode = -1;
 	ktp->cmd_retcode_available = BOOL_FALSE;
-	ktp->request_done = BOOL_FALSE; // Be pessimistic
+	ktp->request_done = BOOL_FALSE;
 	ktp->cmd_features = KTP_STATUS_NONE;
 	ktp->cmd_features_available = BOOL_FALSE;
 

+ 30 - 5
klish/ktp_session.h

@@ -33,8 +33,32 @@ bool_t ktp_stall_cb(faux_async_t *async, size_t len, void *user_data);
 
 
 // Client KTP session
+
+typedef enum {
+	KTP_SESSION_STATE_ERROR = 'e', // Some unknown error
+	KTP_SESSION_STATE_DISCONNECTED = 'd',
+	KTP_SESSION_STATE_UNAUTHORIZED = 'a',
+	KTP_SESSION_STATE_IDLE = 'i',
+	KTP_SESSION_STATE_WAIT_FOR_COMPLETION = 'v',
+	KTP_SESSION_STATE_WAIT_FOR_HELP = 'h',
+	KTP_SESSION_STATE_WAIT_FOR_CMD = 'c',
+} ktp_session_state_e;
+
+typedef enum {
+	KTP_SESSION_CB_STDOUT,
+	KTP_SESSION_CB_STDERR,
+	KTP_SESSION_CB_CMD_ACK_INCOMPLETED,
+	KTP_SESSION_CB_CMD_ACK,
+	KTP_SESSION_CB_COMPLETION_ACK,
+	KTP_SESSION_CB_HELP_ACK,
+	KTP_SESSION_CB_EXIT,
+	KTP_SESSION_CB_MAX,
+} ktp_session_cb_e;
+
 typedef bool_t (*ktp_session_stdout_cb_fn)(ktp_session_t *ktp,
-	const char *line, size_t len, void *user_data);
+	const char *line, size_t len, void *udata);
+typedef bool_t (*ktp_session_event_cb_fn)(ktp_session_t *ktp,
+	const faux_msg_t *msg, void *udata);
 
 ktp_session_t *ktp_session_new(int sock, faux_eloop_t *eloop);
 void ktp_session_free(ktp_session_t *session);
@@ -43,12 +67,13 @@ faux_eloop_t *ktp_session_eloop(const ktp_session_t *ktp);
 bool_t ktp_session_done(const ktp_session_t *ktp);
 bool_t ktp_session_set_done(ktp_session_t *ktp, bool_t done);
 faux_error_t *ktp_session_error(const ktp_session_t *ktp);
-bool_t ktp_session_set_stdout_cb(ktp_session_t *ktp,
-	ktp_session_stdout_cb_fn stdout_cb, void *stdout_udata);
-bool_t ktp_session_set_stderr_cb(ktp_session_t *ktp,
-	ktp_session_stdout_cb_fn stderr_cb, void *stderr_udata);
+bool_t ktp_session_set_cb(ktp_session_t *ktp, ktp_session_cb_e cb_id,
+	void *fn, void *udata);
 bool_t ktp_session_connected(ktp_session_t *session);
 int ktp_session_fd(const ktp_session_t *session);
+bool_t ktp_session_stop_on_answer(const ktp_session_t *ktp);
+bool_t ktp_session_set_stop_on_answer(ktp_session_t *ktp, bool_t stop_on_answer);
+ktp_session_state_e ktp_session_state(const ktp_session_t *ktp);
 
 bool_t ktp_session_cmd(ktp_session_t *ktp, const char *line,
 	faux_error_t *error, bool_t dry_run);