aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJunio C Hamano <gitster@pobox.com>2025-11-08 10:33:20 -0800
committerJunio C Hamano <gitster@pobox.com>2025-11-08 10:33:20 -0800
commit0cea3ad7186bbdc21864fbabfd19cdf3ae08d9cb (patch)
tree78d0640d85537aff27695bc0ab88610cfbdc4014
parentMerge branch 'en/xdiff-cleanup-2' into seen (diff)
parentreceive-pack: convert receive hooks to hook API (diff)
downloadgit-0cea3ad7186bbdc21864fbabfd19cdf3ae08d9cb.tar.gz
git-0cea3ad7186bbdc21864fbabfd19cdf3ae08d9cb.zip
Merge branch 'ar/run-command-hook' into seen
Use hook API to replace ad-hoc invocation of hook scripts with the run_command() API. Comments? * ar/run-command-hook: receive-pack: convert receive hooks to hook API receive-pack: convert update hooks to new API hooks: allow callers to capture output run-command: allow capturing of collated output reference-transaction: use hook API instead of run-command hook: allow overriding the ungroup option transport: convert pre-push to hook API hook: convert 'post-rewrite' hook in sequencer.c to hook API hook: provide stdin via callback run-command: add stdin callback for parallelization
-rw-r--r--builtin/hook.c6
-rw-r--r--builtin/receive-pack.c289
-rw-r--r--commit.c3
-rw-r--r--hook.c21
-rw-r--r--hook.h36
-rw-r--r--refs.c101
-rw-r--r--run-command.c110
-rw-r--r--run-command.h42
-rw-r--r--sequencer.c42
-rw-r--r--t/helper/test-run-command.c67
-rwxr-xr-xt/t0061-run-command.sh38
-rw-r--r--transport.c83
12 files changed, 557 insertions, 281 deletions
diff --git a/builtin/hook.c b/builtin/hook.c
index 7afec380d2..73e7b8c2e8 100644
--- a/builtin/hook.c
+++ b/builtin/hook.c
@@ -44,6 +44,12 @@ static int run(int argc, const char **argv, const char *prefix,
goto usage;
/*
+ * All current "hook run" use-cases require ungrouped child output.
+ * If this changes, a hook run argument can be added to toggle it.
+ */
+ opt.ungroup = 1;
+
+ /*
* Having a -- for "run" when providing <hook-args> is
* mandatory.
*/
diff --git a/builtin/receive-pack.c b/builtin/receive-pack.c
index e8ee0e7321..cfc1eeedd5 100644
--- a/builtin/receive-pack.c
+++ b/builtin/receive-pack.c
@@ -748,7 +748,7 @@ static int check_cert_push_options(const struct string_list *push_options)
return retval;
}
-static void prepare_push_cert_sha1(struct child_process *proc)
+static void prepare_push_cert_sha1(struct run_hooks_opt *opt)
{
static int already_done;
@@ -774,147 +774,132 @@ static void prepare_push_cert_sha1(struct child_process *proc)
nonce_status = check_nonce(sigcheck.payload);
}
if (!is_null_oid(&push_cert_oid)) {
- strvec_pushf(&proc->env, "GIT_PUSH_CERT=%s",
+ strvec_pushf(&opt->env, "GIT_PUSH_CERT=%s",
oid_to_hex(&push_cert_oid));
- strvec_pushf(&proc->env, "GIT_PUSH_CERT_SIGNER=%s",
+ strvec_pushf(&opt->env, "GIT_PUSH_CERT_SIGNER=%s",
sigcheck.signer ? sigcheck.signer : "");
- strvec_pushf(&proc->env, "GIT_PUSH_CERT_KEY=%s",
+ strvec_pushf(&opt->env, "GIT_PUSH_CERT_KEY=%s",
sigcheck.key ? sigcheck.key : "");
- strvec_pushf(&proc->env, "GIT_PUSH_CERT_STATUS=%c",
+ strvec_pushf(&opt->env, "GIT_PUSH_CERT_STATUS=%c",
sigcheck.result);
if (push_cert_nonce) {
- strvec_pushf(&proc->env,
+ strvec_pushf(&opt->env,
"GIT_PUSH_CERT_NONCE=%s",
push_cert_nonce);
- strvec_pushf(&proc->env,
+ strvec_pushf(&opt->env,
"GIT_PUSH_CERT_NONCE_STATUS=%s",
nonce_status);
if (nonce_status == NONCE_SLOP)
- strvec_pushf(&proc->env,
+ strvec_pushf(&opt->env,
"GIT_PUSH_CERT_NONCE_SLOP=%ld",
nonce_stamp_slop);
}
}
}
+struct receive_hook_feed_context {
+ struct command *cmd;
+ int skip_broken;
+};
+
struct receive_hook_feed_state {
struct command *cmd;
struct ref_push_report *report;
int skip_broken;
struct strbuf buf;
- const struct string_list *push_options;
};
-typedef int (*feed_fn)(void *, const char **, size_t *);
-static int run_and_feed_hook(const char *hook_name, feed_fn feed,
- struct receive_hook_feed_state *feed_state)
+static int feed_receive_hook(int hook_stdin_fd, struct receive_hook_feed_state *state, int lines_batch_size)
{
- struct child_process proc = CHILD_PROCESS_INIT;
- struct async muxer;
- int code;
- const char *hook_path = find_hook(the_repository, hook_name);
+ struct command *cmd = state->cmd;
- if (!hook_path)
- return 0;
+ strbuf_reset(&state->buf);
- strvec_push(&proc.args, hook_path);
- proc.in = -1;
- proc.stdout_to_stderr = 1;
- proc.trace2_hook_name = hook_name;
-
- if (feed_state->push_options) {
- size_t i;
- for (i = 0; i < feed_state->push_options->nr; i++)
- strvec_pushf(&proc.env,
- "GIT_PUSH_OPTION_%"PRIuMAX"=%s",
- (uintmax_t)i,
- feed_state->push_options->items[i].string);
- strvec_pushf(&proc.env, "GIT_PUSH_OPTION_COUNT=%"PRIuMAX"",
- (uintmax_t)feed_state->push_options->nr);
- } else
- strvec_pushf(&proc.env, "GIT_PUSH_OPTION_COUNT");
+ /* batch lines to avoid going through run-command's ppoll for each line */
+ for (int i = 0; i < lines_batch_size; i++) {
+ while (cmd &&
+ state->skip_broken && (cmd->error_string || cmd->did_not_exist))
+ cmd = cmd->next;
- if (tmp_objdir)
- strvec_pushv(&proc.env, tmp_objdir_env(tmp_objdir));
+ if (!cmd)
+ break; /* no more commands left */
- if (use_sideband) {
- memset(&muxer, 0, sizeof(muxer));
- muxer.proc = copy_to_sideband;
- muxer.in = -1;
- code = start_async(&muxer);
- if (code)
- return code;
- proc.err = muxer.in;
- }
+ if (!state->report)
+ state->report = cmd->report;
- prepare_push_cert_sha1(&proc);
+ if (state->report) {
+ struct object_id *old_oid;
+ struct object_id *new_oid;
+ const char *ref_name;
- code = start_command(&proc);
- if (code) {
- if (use_sideband)
- finish_async(&muxer);
- return code;
- }
+ old_oid = state->report->old_oid ? state->report->old_oid : &cmd->old_oid;
+ new_oid = state->report->new_oid ? state->report->new_oid : &cmd->new_oid;
+ ref_name = state->report->ref_name ? state->report->ref_name : cmd->ref_name;
- sigchain_push(SIGPIPE, SIG_IGN);
+ strbuf_addf(&state->buf, "%s %s %s\n",
+ oid_to_hex(old_oid), oid_to_hex(new_oid),
+ ref_name);
- while (1) {
- const char *buf;
- size_t n;
- if (feed(feed_state, &buf, &n))
- break;
- if (write_in_full(proc.in, buf, n) < 0)
- break;
+ state->report = state->report->next;
+ if (!state->report)
+ cmd = cmd->next;
+ } else {
+ strbuf_addf(&state->buf, "%s %s %s\n",
+ oid_to_hex(&cmd->old_oid), oid_to_hex(&cmd->new_oid),
+ cmd->ref_name);
+ cmd = cmd->next;
+ }
}
- close(proc.in);
- if (use_sideband)
- finish_async(&muxer);
- sigchain_pop(SIGPIPE);
+ state->cmd = cmd;
- return finish_command(&proc);
+ if (state->buf.len > 0) {
+ int ret = write_in_full(hook_stdin_fd, state->buf.buf, state->buf.len);
+ if (ret < 0) {
+ if (errno == EPIPE)
+ return 1; /* child closed pipe */
+ return ret;
+ }
+ }
+
+ return state->cmd ? 0 : 1; /* 0 = more to come, 1 = EOF */
}
-static int feed_receive_hook(void *state_, const char **bufp, size_t *sizep)
+static int feed_receive_hook_cb(int hook_stdin_fd, void *pp_cb, void *pp_task_cb UNUSED)
{
- struct receive_hook_feed_state *state = state_;
- struct command *cmd = state->cmd;
+ struct hook_cb_data *hook_cb = pp_cb;
+ struct receive_hook_feed_state *feed_state = hook_cb->options->feed_pipe_cb_data;
- while (cmd &&
- state->skip_broken && (cmd->error_string || cmd->did_not_exist))
- cmd = cmd->next;
- if (!cmd)
- return -1; /* EOF */
- if (!bufp)
- return 0; /* OK, can feed something. */
- strbuf_reset(&state->buf);
- if (!state->report)
- state->report = cmd->report;
- if (state->report) {
- struct object_id *old_oid;
- struct object_id *new_oid;
- const char *ref_name;
-
- old_oid = state->report->old_oid ? state->report->old_oid : &cmd->old_oid;
- new_oid = state->report->new_oid ? state->report->new_oid : &cmd->new_oid;
- ref_name = state->report->ref_name ? state->report->ref_name : cmd->ref_name;
- strbuf_addf(&state->buf, "%s %s %s\n",
- oid_to_hex(old_oid), oid_to_hex(new_oid),
- ref_name);
- state->report = state->report->next;
- if (!state->report)
- state->cmd = cmd->next;
- } else {
- strbuf_addf(&state->buf, "%s %s %s\n",
- oid_to_hex(&cmd->old_oid), oid_to_hex(&cmd->new_oid),
- cmd->ref_name);
- state->cmd = cmd->next;
- }
- if (bufp) {
- *bufp = state->buf.buf;
- *sizep = state->buf.len;
+ /* first-time setup */
+ if (!hook_cb->options->feed_pipe_cb_data) {
+ struct receive_hook_feed_context *ctx = hook_cb->options->feed_pipe_ctx;
+ if (!ctx)
+ BUG("run_hooks_opt.feed_pipe_ctx required for receive hook");
+
+ hook_cb->options->feed_pipe_cb_data = xmalloc(sizeof(struct receive_hook_feed_state));
+ feed_state = hook_cb->options->feed_pipe_cb_data;
+ strbuf_init(&feed_state->buf, 0);
+ feed_state->cmd = ctx->cmd;
+ feed_state->skip_broken = ctx->skip_broken;
+ feed_state->report = NULL;
}
- return 0;
+
+ /* batch 500 lines at once to avoid going through the run-command ppoll loop too often */
+ if (feed_receive_hook(hook_stdin_fd, feed_state, 500) == 0)
+ return 0; /* still have more data to feed */
+
+ strbuf_release(&feed_state->buf);
+
+ if (hook_cb->options->feed_pipe_cb_data)
+ FREE_AND_NULL(hook_cb->options->feed_pipe_cb_data);
+
+ return 1; /* done feeding, run-command can close pipe */
+}
+
+static void hook_output_to_sideband(struct strbuf *output, void *cb_data UNUSED)
+{
+ if (output && output->len)
+ send_sideband(1, 2, output->buf, output->len, use_sideband);
}
static int run_receive_hook(struct command *commands,
@@ -922,47 +907,58 @@ static int run_receive_hook(struct command *commands,
int skip_broken,
const struct string_list *push_options)
{
- struct receive_hook_feed_state state;
- int status;
+ struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
+ struct receive_hook_feed_context ctx;
+ struct command *iter = commands;
- strbuf_init(&state.buf, 0);
- state.cmd = commands;
- state.skip_broken = skip_broken;
- state.report = NULL;
- if (feed_receive_hook(&state, NULL, NULL))
+ /* if there are no valid commands, don't invoke the hook at all. */
+ while (iter && skip_broken && (iter->error_string || iter->did_not_exist))
+ iter = iter->next;
+ if (!iter)
return 0;
- state.cmd = commands;
- state.push_options = push_options;
- status = run_and_feed_hook(hook_name, feed_receive_hook, &state);
- strbuf_release(&state.buf);
- return status;
+
+ if (push_options) {
+ int i;
+ for (i = 0; i < push_options->nr; i++)
+ strvec_pushf(&opt.env, "GIT_PUSH_OPTION_%d=%s", i,
+ push_options->items[i].string);
+ strvec_pushf(&opt.env, "GIT_PUSH_OPTION_COUNT=%"PRIuMAX"",
+ (uintmax_t)push_options->nr);
+ } else
+ strvec_push(&opt.env, "GIT_PUSH_OPTION_COUNT");
+
+ if (tmp_objdir)
+ strvec_pushv(&opt.env, tmp_objdir_env(tmp_objdir));
+
+ prepare_push_cert_sha1(&opt);
+
+ /* set up sideband printer */
+ if (use_sideband)
+ opt.consume_sideband = hook_output_to_sideband;
+
+ /* set up stdin callback */
+ ctx.cmd = commands;
+ ctx.skip_broken = skip_broken;
+ opt.feed_pipe = feed_receive_hook_cb;
+ opt.feed_pipe_ctx = &ctx;
+
+ return run_hooks_opt(the_repository, hook_name, &opt);
}
static int run_update_hook(struct command *cmd)
{
- struct child_process proc = CHILD_PROCESS_INIT;
- int code;
- const char *hook_path = find_hook(the_repository, "update");
-
- if (!hook_path)
- return 0;
-
- strvec_push(&proc.args, hook_path);
- strvec_push(&proc.args, cmd->ref_name);
- strvec_push(&proc.args, oid_to_hex(&cmd->old_oid));
- strvec_push(&proc.args, oid_to_hex(&cmd->new_oid));
+ struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
- proc.no_stdin = 1;
- proc.stdout_to_stderr = 1;
- proc.err = use_sideband ? -1 : 0;
- proc.trace2_hook_name = "update";
+ strvec_pushl(&opt.args,
+ cmd->ref_name,
+ oid_to_hex(&cmd->old_oid),
+ oid_to_hex(&cmd->new_oid),
+ NULL);
- code = start_command(&proc);
- if (code)
- return code;
if (use_sideband)
- copy_to_sideband(proc.err, -1, NULL);
- return finish_command(&proc);
+ opt.consume_sideband = hook_output_to_sideband;
+
+ return run_hooks_opt(the_repository, "update", &opt);
}
static struct command *find_command_by_refname(struct command *list,
@@ -1639,33 +1635,20 @@ out:
static void run_update_post_hook(struct command *commands)
{
struct command *cmd;
- struct child_process proc = CHILD_PROCESS_INIT;
- const char *hook;
-
- hook = find_hook(the_repository, "post-update");
- if (!hook)
- return;
+ struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
for (cmd = commands; cmd; cmd = cmd->next) {
if (cmd->error_string || cmd->did_not_exist)
continue;
- if (!proc.args.nr)
- strvec_push(&proc.args, hook);
- strvec_push(&proc.args, cmd->ref_name);
+ strvec_push(&opt.args, cmd->ref_name);
}
- if (!proc.args.nr)
+ if (!opt.args.nr)
return;
- proc.no_stdin = 1;
- proc.stdout_to_stderr = 1;
- proc.err = use_sideband ? -1 : 0;
- proc.trace2_hook_name = "post-update";
+ if (use_sideband)
+ opt.consume_sideband = hook_output_to_sideband;
- if (!start_command(&proc)) {
- if (use_sideband)
- copy_to_sideband(proc.err, -1, NULL);
- finish_command(&proc);
- }
+ run_hooks_opt(the_repository, "post-update", &opt);
}
static void check_aliased_update_internal(struct command *cmd,
diff --git a/commit.c b/commit.c
index 16d91b2bfc..7da33dde86 100644
--- a/commit.c
+++ b/commit.c
@@ -1965,6 +1965,9 @@ int run_commit_hook(int editor_is_used, const char *index_file,
strvec_push(&opt.args, arg);
va_end(args);
+ /* All commit hook use-cases require ungrouping child output. */
+ opt.ungroup = 1;
+
opt.invoked_hook = invoked_hook;
return run_hooks_opt(the_repository, name, &opt);
}
diff --git a/hook.c b/hook.c
index b3de1048bf..fb452b5369 100644
--- a/hook.c
+++ b/hook.c
@@ -65,11 +65,22 @@ static int pick_next_hook(struct child_process *cp,
cp->no_stdin = 1;
strvec_pushv(&cp->env, hook_cb->options->env.v);
+
+ if (hook_cb->options->path_to_stdin && hook_cb->options->feed_pipe)
+ BUG("options path_to_stdin and feed_pipe are mutually exclusive");
+
/* reopen the file for stdin; run_command closes it. */
if (hook_cb->options->path_to_stdin) {
cp->no_stdin = 0;
cp->in = xopen(hook_cb->options->path_to_stdin, O_RDONLY);
}
+
+ if (hook_cb->options->feed_pipe) {
+ cp->no_stdin = 0;
+ /* start_command() will allocate a pipe / stdin fd for us */
+ cp->in = -1;
+ }
+
cp->stdout_to_stderr = 1;
cp->trace2_hook_name = hook_cb->hook_name;
cp->dir = hook_cb->options->dir;
@@ -136,10 +147,12 @@ int run_hooks_opt(struct repository *r, const char *hook_name,
.tr2_label = hook_name,
.processes = 1,
- .ungroup = 1,
+ .ungroup = options->ungroup,
.get_next_task = pick_next_hook,
.start_failure = notify_start_failure,
+ .feed_pipe = options->feed_pipe,
+ .consume_sideband = options->consume_sideband,
.task_finished = notify_hook_finished,
.data = &cb_data,
@@ -148,6 +161,9 @@ int run_hooks_opt(struct repository *r, const char *hook_name,
if (!options)
BUG("a struct run_hooks_opt must be provided to run_hooks");
+ if (options->path_to_stdin && options->feed_pipe)
+ BUG("choose only one method to populate hook stdin");
+
if (options->invoked_hook)
*options->invoked_hook = 0;
@@ -177,6 +193,9 @@ int run_hooks(struct repository *r, const char *hook_name)
{
struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
+ /* All use-cases of this API require ungrouping. */
+ opt.ungroup = 1;
+
return run_hooks_opt(r, hook_name, &opt);
}
diff --git a/hook.h b/hook.h
index 11863fa734..a84e97db34 100644
--- a/hook.h
+++ b/hook.h
@@ -1,6 +1,7 @@
#ifndef HOOK_H
#define HOOK_H
#include "strvec.h"
+#include "run-command.h"
struct repository;
@@ -34,9 +35,44 @@ struct run_hooks_opt
int *invoked_hook;
/**
+ * Allow hooks to set run_processes_parallel() 'ungroup' behavior.
+ */
+ unsigned int ungroup:1;
+
+ /**
* Path to file which should be piped to stdin for each hook.
*/
const char *path_to_stdin;
+
+ /**
+ * Callback to ask for more content to pipe to each hook stdin.
+ *
+ * If a hook needs to consume large quantities of data (e.g. a
+ * list of all refs received in a client push), feeding data via
+ * in-memory strings or slurping to/from files via path_to_stdin
+ * is inefficient, so this callback allows for piecemeal writes.
+ *
+ * Add initalization context to hook.feed_pipe_ctx.
+ *
+ * The caller owns hook.feed_pipe_ctx and has to release any
+ * resources after hooks finish execution.
+ */
+ feed_pipe_fn feed_pipe;
+ void *feed_pipe_ctx;
+
+ /**
+ * Use this to keep internal state for your feed_pipe_fn callback.
+ * Only useful when using run_hooks_opt.feed_pipe, otherwise ignore it.
+ */
+ void *feed_pipe_cb_data;
+
+ /*
+ * Populate this to capture output and prevent it from being printed to
+ * stderr. This will be passed directly through to
+ * run_command:run_parallel_processes(). See t/helper/test-run-command.c
+ * for an example.
+ */
+ consume_sideband_fn consume_sideband;
};
#define RUN_HOOKS_OPT_INIT { \
diff --git a/refs.c b/refs.c
index 5583f6e09d..2b699cb44e 100644
--- a/refs.c
+++ b/refs.c
@@ -2422,68 +2422,73 @@ static int ref_update_reject_duplicates(struct string_list *refnames,
return 0;
}
-static int run_transaction_hook(struct ref_transaction *transaction,
- const char *state)
+struct transaction_feed_cb_data {
+ size_t index;
+ struct strbuf buf;
+};
+
+static int transaction_hook_feed_stdin(int hook_stdin_fd, void *pp_cb, void *pp_task_cb UNUSED)
{
- struct child_process proc = CHILD_PROCESS_INIT;
- struct strbuf buf = STRBUF_INIT;
- const char *hook;
- int ret = 0;
+ struct hook_cb_data *hook_cb = pp_cb;
+ struct run_hooks_opt *opt = hook_cb->options;
+ struct ref_transaction *transaction = opt->feed_pipe_ctx;
+ struct transaction_feed_cb_data *feed_cb_data = opt->feed_pipe_cb_data;
+ struct strbuf *buf = &feed_cb_data->buf;
+ struct ref_update *update;
+ size_t i = feed_cb_data->index++;
+ int ret;
- hook = find_hook(transaction->ref_store->repo, "reference-transaction");
- if (!hook)
- return ret;
+ if (i >= transaction->nr)
+ return 1; /* No more refs to process */
- strvec_pushl(&proc.args, hook, state, NULL);
- proc.in = -1;
- proc.stdout_to_stderr = 1;
- proc.trace2_hook_name = "reference-transaction";
+ update = transaction->updates[i];
- ret = start_command(&proc);
- if (ret)
- return ret;
+ if (update->flags & REF_LOG_ONLY)
+ return 0;
- sigchain_push(SIGPIPE, SIG_IGN);
+ strbuf_reset(buf);
- for (size_t i = 0; i < transaction->nr; i++) {
- struct ref_update *update = transaction->updates[i];
+ if (!(update->flags & REF_HAVE_OLD))
+ strbuf_addf(buf, "%s ", oid_to_hex(null_oid(the_hash_algo)));
+ else if (update->old_target)
+ strbuf_addf(buf, "ref:%s ", update->old_target);
+ else
+ strbuf_addf(buf, "%s ", oid_to_hex(&update->old_oid));
- if (update->flags & REF_LOG_ONLY)
- continue;
+ if (!(update->flags & REF_HAVE_NEW))
+ strbuf_addf(buf, "%s ", oid_to_hex(null_oid(the_hash_algo)));
+ else if (update->new_target)
+ strbuf_addf(buf, "ref:%s ", update->new_target);
+ else
+ strbuf_addf(buf, "%s ", oid_to_hex(&update->new_oid));
- strbuf_reset(&buf);
+ strbuf_addf(buf, "%s\n", update->refname);
- if (!(update->flags & REF_HAVE_OLD))
- strbuf_addf(&buf, "%s ", oid_to_hex(null_oid(the_hash_algo)));
- else if (update->old_target)
- strbuf_addf(&buf, "ref:%s ", update->old_target);
- else
- strbuf_addf(&buf, "%s ", oid_to_hex(&update->old_oid));
+ ret = write_in_full(hook_stdin_fd, buf->buf, buf->len);
+ if (ret < 0 && errno != EPIPE)
+ return ret;
- if (!(update->flags & REF_HAVE_NEW))
- strbuf_addf(&buf, "%s ", oid_to_hex(null_oid(the_hash_algo)));
- else if (update->new_target)
- strbuf_addf(&buf, "ref:%s ", update->new_target);
- else
- strbuf_addf(&buf, "%s ", oid_to_hex(&update->new_oid));
+ return 0; /* no more input to feed */
+}
+
+static int run_transaction_hook(struct ref_transaction *transaction,
+ const char *state)
+{
+ struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
+ struct transaction_feed_cb_data feed_ctx = { 0 };
+ int ret = 0;
- strbuf_addf(&buf, "%s\n", update->refname);
+ strvec_push(&opt.args, state);
- if (write_in_full(proc.in, buf.buf, buf.len) < 0) {
- if (errno != EPIPE) {
- /* Don't leak errno outside this API */
- errno = 0;
- ret = -1;
- }
- break;
- }
- }
+ opt.feed_pipe = transaction_hook_feed_stdin;
+ opt.feed_pipe_ctx = transaction;
+ opt.feed_pipe_cb_data = &feed_ctx;
- close(proc.in);
- sigchain_pop(SIGPIPE);
- strbuf_release(&buf);
+ strbuf_init(&feed_ctx.buf, 0);
+
+ ret = run_hooks_opt(transaction->ref_store->repo, "reference-transaction", &opt);
- ret |= finish_command(&proc);
+ strbuf_release(&feed_ctx.buf);
return ret;
}
diff --git a/run-command.c b/run-command.c
index ed9575bd6a..f217adcad6 100644
--- a/run-command.c
+++ b/run-command.c
@@ -1578,7 +1578,10 @@ static void pp_cleanup(struct parallel_processes *pp,
* When get_next_task added messages to the buffer in its last
* iteration, the buffered output is non empty.
*/
- strbuf_write(&pp->buffered_output, stderr);
+ if (opts->consume_sideband)
+ opts->consume_sideband(&pp->buffered_output, opts->data);
+ else
+ strbuf_write(&pp->buffered_output, stderr);
strbuf_release(&pp->buffered_output);
sigchain_pop_common();
@@ -1652,6 +1655,44 @@ static int pp_start_one(struct parallel_processes *pp,
return 0;
}
+static void pp_buffer_stdin(struct parallel_processes *pp,
+ const struct run_process_parallel_opts *opts)
+{
+ /* Buffer stdin for each pipe. */
+ for (ssize_t i = 0; i < opts->processes; i++) {
+ struct child_process *proc = &pp->children[i].process;
+ int ret;
+
+ if (pp->children[i].state != GIT_CP_WORKING || proc->in <= 0)
+ continue;
+
+ /*
+ * child input is provided via path_to_stdin when the feed_pipe cb is
+ * missing, so we just signal an EOF.
+ */
+ if (!opts->feed_pipe) {
+ close(proc->in);
+ proc->in = 0;
+ continue;
+ }
+
+ /**
+ * Feed the pipe:
+ * ret < 0 means error
+ * ret == 0 means there is more data to be fed
+ * ret > 0 means feeding finished
+ */
+ ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data);
+ if (ret < 0)
+ die_errno("feed_pipe");
+
+ if (ret) {
+ close(proc->in);
+ proc->in = 0;
+ }
+ }
+}
+
static void pp_buffer_stderr(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts,
int output_timeout)
@@ -1679,13 +1720,17 @@ static void pp_buffer_stderr(struct parallel_processes *pp,
}
}
-static void pp_output(const struct parallel_processes *pp)
+static void pp_output(const struct parallel_processes *pp,
+ const struct run_process_parallel_opts *opts)
{
size_t i = pp->output_owner;
if (pp->children[i].state == GIT_CP_WORKING &&
pp->children[i].err.len) {
- strbuf_write(&pp->children[i].err, stderr);
+ if (opts->consume_sideband)
+ opts->consume_sideband(&pp->children[i].err, opts->data);
+ else
+ strbuf_write(&pp->children[i].err, stderr);
strbuf_reset(&pp->children[i].err);
}
}
@@ -1722,6 +1767,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
pp->children[i].state = GIT_CP_FREE;
if (pp->pfd)
pp->pfd[i].fd = -1;
+ pp->children[i].process.in = 0;
child_process_init(&pp->children[i].process);
if (opts->ungroup) {
@@ -1732,11 +1778,15 @@ static int pp_collect_finished(struct parallel_processes *pp,
} else {
const size_t n = opts->processes;
- strbuf_write(&pp->children[i].err, stderr);
+ /* Output errors, then all other finished child processes */
+ if (opts->consume_sideband) {
+ opts->consume_sideband(&pp->children[i].err, opts->data);
+ opts->consume_sideband(&pp->buffered_output, opts->data);
+ } else {
+ strbuf_write(&pp->children[i].err, stderr);
+ strbuf_write(&pp->buffered_output, stderr);
+ }
strbuf_reset(&pp->children[i].err);
-
- /* Output all other finished child processes */
- strbuf_write(&pp->buffered_output, stderr);
strbuf_reset(&pp->buffered_output);
/*
@@ -1756,6 +1806,32 @@ static int pp_collect_finished(struct parallel_processes *pp,
return result;
}
+static void pp_handle_child_IO(struct parallel_processes *pp,
+ const struct run_process_parallel_opts *opts,
+ int output_timeout)
+{
+ /*
+ * First push input, if any (it might no-op), to child tasks to avoid them blocking
+ * after input. This also prevents deadlocks when ungrouping below, if a child blocks
+ * while the parent also waits for them to finish.
+ */
+ pp_buffer_stdin(pp, opts);
+
+ if (opts->ungroup) {
+ for (size_t i = 0; i < opts->processes; i++) {
+ int child_ready_for_cleanup =
+ pp->children[i].state == GIT_CP_WORKING &&
+ pp->children[i].process.in == 0;
+
+ if (child_ready_for_cleanup)
+ pp->children[i].state = GIT_CP_WAIT_CLEANUP;
+ }
+ } else {
+ pp_buffer_stderr(pp, opts, output_timeout);
+ pp_output(pp, opts);
+ }
+}
+
void run_processes_parallel(const struct run_process_parallel_opts *opts)
{
int i, code;
@@ -1775,6 +1851,16 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
"max:%"PRIuMAX,
(uintmax_t)opts->processes);
+ if (opts->ungroup && opts->consume_sideband)
+ BUG("ungroup and reading sideband are mutualy exclusive");
+
+ /*
+ * Child tasks might receive input via stdin, terminating early (or not), so
+ * ignore the default SIGPIPE which gets handled by each feed_pipe_fn which
+ * actually writes the data to children stdin fds.
+ */
+ sigchain_push(SIGPIPE, SIG_IGN);
+
pp_init(&pp, opts, &pp_sig);
while (1) {
for (i = 0;
@@ -1792,13 +1878,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
}
if (!pp.nr_processes)
break;
- if (opts->ungroup) {
- for (size_t i = 0; i < opts->processes; i++)
- pp.children[i].state = GIT_CP_WAIT_CLEANUP;
- } else {
- pp_buffer_stderr(&pp, opts, output_timeout);
- pp_output(&pp);
- }
+ pp_handle_child_IO(&pp, opts, output_timeout);
code = pp_collect_finished(&pp, opts);
if (code) {
pp.shutdown = 1;
@@ -1809,6 +1889,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
pp_cleanup(&pp, opts);
+ sigchain_pop(SIGPIPE);
+
if (do_trace2)
trace2_region_leave(tr2_category, tr2_label, NULL);
}
diff --git a/run-command.h b/run-command.h
index 0df25e445f..2c2484478b 100644
--- a/run-command.h
+++ b/run-command.h
@@ -421,6 +421,36 @@ typedef int (*start_failure_fn)(struct strbuf *out,
void *pp_task_cb);
/**
+ * This callback is repeatedly called on every child process who requests
+ * start_command() to create a pipe by setting child_process.in < 0.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel, and
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ * The contents of 'send' will be read into the pipe and passed to the pipe.
+ *
+ * Returns < 0 for error
+ * Returns == 0 when there is more data to be fed (will be called again)
+ * Returns > 0 when finished (child closed fd or no more data to be fed)
+ */
+typedef int (*feed_pipe_fn)(int child_in,
+ void *pp_cb,
+ void *pp_task_cb);
+
+/**
+ * If this callback is provided, instead of collating process output to stderr,
+ * they will be collated into a new pipe. consume_sideband_fn will be called
+ * repeatedly. When output is available on that pipe, it will be contained in
+ * 'output'. But it will be called with an empty 'output' too, to allow for
+ * keepalives or similar operations if necessary.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel.
+ *
+ * Since this callback is provided with the collated output, no task cookie is
+ * provided.
+ */
+typedef void (*consume_sideband_fn)(struct strbuf *output, void *pp_cb);
+
+/**
* This callback is called on every child process that finished processing.
*
* See run_processes_parallel() below for a discussion of the "struct
@@ -473,6 +503,18 @@ struct run_process_parallel_opts
*/
start_failure_fn start_failure;
+ /*
+ * feed_pipe: see feed_pipe_fn() above. This can be NULL to omit any
+ * special handling.
+ */
+ feed_pipe_fn feed_pipe;
+
+ /*
+ * consume_sideband: see consume_sideband_fn() above. This can be NULL
+ * to omit any special handling.
+ */
+ consume_sideband_fn consume_sideband;
+
/**
* task_finished: See task_finished_fn() above. This can be
* NULL to omit any special handling.
diff --git a/sequencer.c b/sequencer.c
index 5476d39ba9..71ed31c774 100644
--- a/sequencer.c
+++ b/sequencer.c
@@ -1292,32 +1292,40 @@ int update_head_with_reflog(const struct commit *old_head,
return ret;
}
+static int pipe_from_strbuf(int hook_stdin_fd, void *pp_cb, void *pp_task_cb UNUSED)
+{
+ struct hook_cb_data *hook_cb = pp_cb;
+ struct strbuf *to_pipe = hook_cb->options->feed_pipe_ctx;
+ int ret;
+
+ if (!to_pipe)
+ BUG("pipe_from_strbuf called without feed_pipe_ctx");
+
+ ret = write_in_full(hook_stdin_fd, to_pipe->buf, to_pipe->len);
+ if (ret < 0 && errno != EPIPE)
+ return ret;
+
+ return 1; /* done writing */
+}
+
static int run_rewrite_hook(const struct object_id *oldoid,
const struct object_id *newoid)
{
- struct child_process proc = CHILD_PROCESS_INIT;
+ struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
int code;
struct strbuf sb = STRBUF_INIT;
- const char *hook_path = find_hook(the_repository, "post-rewrite");
- if (!hook_path)
- return 0;
+ strbuf_addf(&sb, "%s %s\n", oid_to_hex(oldoid), oid_to_hex(newoid));
- strvec_pushl(&proc.args, hook_path, "amend", NULL);
- proc.in = -1;
- proc.stdout_to_stderr = 1;
- proc.trace2_hook_name = "post-rewrite";
+ opt.feed_pipe_ctx = &sb;
+ opt.feed_pipe = pipe_from_strbuf;
+
+ strvec_push(&opt.args, "amend");
+
+ code = run_hooks_opt(the_repository, "post-rewrite", &opt);
- code = start_command(&proc);
- if (code)
- return code;
- strbuf_addf(&sb, "%s %s\n", oid_to_hex(oldoid), oid_to_hex(newoid));
- sigchain_push(SIGPIPE, SIG_IGN);
- write_in_full(proc.in, sb.buf, sb.len);
- close(proc.in);
strbuf_release(&sb);
- sigchain_pop(SIGPIPE);
- return finish_command(&proc);
+ return code;
}
void commit_post_rewrite(struct repository *r,
diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c
index 3719f23cc2..95152a0395 100644
--- a/t/helper/test-run-command.c
+++ b/t/helper/test-run-command.c
@@ -23,19 +23,26 @@ static int number_callbacks;
static int parallel_next(struct child_process *cp,
struct strbuf *err,
void *cb,
- void **task_cb UNUSED)
+ void **task_cb)
{
struct child_process *d = cb;
if (number_callbacks >= 4)
return 0;
strvec_pushv(&cp->args, d->args.v);
+ cp->in = d->in;
+ cp->no_stdin = d->no_stdin;
if (err)
strbuf_addstr(err, "preloaded output of a child\n");
else
fprintf(stderr, "preloaded output of a child\n");
number_callbacks++;
+
+ /* test_stdin callback will use this to count remaining lines */
+ *task_cb = xmalloc(sizeof(int));
+ *(int*)(*task_cb) = 2;
+
return 1;
}
@@ -51,18 +58,61 @@ static int no_job(struct child_process *cp UNUSED,
return 0;
}
+static void test_consume_sideband(struct strbuf *output, void *cb UNUSED)
+{
+ FILE *sideband;
+
+ sideband = fopen("./sideband", "a");
+
+ strbuf_write(output, sideband);
+ fclose(sideband);
+}
+
static int task_finished(int result UNUSED,
struct strbuf *err,
void *pp_cb UNUSED,
- void *pp_task_cb UNUSED)
+ void *pp_task_cb)
{
if (err)
strbuf_addstr(err, "asking for a quick stop\n");
else
fprintf(stderr, "asking for a quick stop\n");
+ if (pp_task_cb)
+ FREE_AND_NULL(pp_task_cb);
return 1;
}
+static int task_finished_quiet(int result UNUSED,
+ struct strbuf *err UNUSED,
+ void *pp_cb UNUSED,
+ void *pp_task_cb)
+{
+ if (pp_task_cb)
+ FREE_AND_NULL(pp_task_cb);
+ return 0;
+}
+
+static int test_stdin_pipe_feed(int hook_stdin_fd, void *cb UNUSED, void *task_cb)
+{
+ int *lines_remaining = task_cb;
+
+ if (*lines_remaining) {
+ struct strbuf buf = STRBUF_INIT;
+ strbuf_addf(&buf, "sample stdin %d\n", --(*lines_remaining));
+ if (write_in_full(hook_stdin_fd, buf.buf, buf.len) < 0) {
+ if (errno == EPIPE) {
+ /* child closed stdin, nothing more to do */
+ strbuf_release(&buf);
+ return 1;
+ }
+ die_errno("write");
+ }
+ strbuf_release(&buf);
+ }
+
+ return !(*lines_remaining);
+}
+
struct testsuite {
struct string_list tests, failed;
int next;
@@ -157,6 +207,8 @@ static int testsuite(int argc, const char **argv)
struct run_process_parallel_opts opts = {
.get_next_task = next_test,
.start_failure = test_failed,
+ .feed_pipe = test_stdin_pipe_feed,
+ .consume_sideband = test_consume_sideband,
.task_finished = test_finished,
.data = &suite,
};
@@ -460,12 +512,23 @@ int cmd__run_command(int argc, const char **argv)
if (!strcmp(argv[1], "run-command-parallel")) {
opts.get_next_task = parallel_next;
+ opts.task_finished = task_finished_quiet;
} else if (!strcmp(argv[1], "run-command-abort")) {
opts.get_next_task = parallel_next;
opts.task_finished = task_finished;
} else if (!strcmp(argv[1], "run-command-no-jobs")) {
opts.get_next_task = no_job;
opts.task_finished = task_finished;
+ } else if (!strcmp(argv[1], "run-command-stdin")) {
+ proc.in = -1;
+ proc.no_stdin = 0;
+ opts.get_next_task = parallel_next;
+ opts.task_finished = task_finished_quiet;
+ opts.feed_pipe = test_stdin_pipe_feed;
+ } else if (!strcmp(argv[1], "run-command-sideband")) {
+ opts.get_next_task = parallel_next;
+ opts.consume_sideband = test_consume_sideband;
+ opts.task_finished = task_finished_quiet;
} else {
ret = 1;
fprintf(stderr, "check usage\n");
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 76d4936a87..f133d71783 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -164,6 +164,44 @@ test_expect_success 'run_command runs ungrouped in parallel with more tasks than
test_line_count = 4 err
'
+test_expect_success 'run_command can divert output' '
+ test_when_finished rm sideband &&
+ test-tool run-command run-command-sideband 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+ test_must_be_empty actual &&
+ test_cmp expect sideband
+'
+
+test_expect_success 'run_command listens to stdin' '
+ cat >expect <<-\EOF &&
+ preloaded output of a child
+ listening for stdin:
+ sample stdin 1
+ sample stdin 0
+ preloaded output of a child
+ listening for stdin:
+ sample stdin 1
+ sample stdin 0
+ preloaded output of a child
+ listening for stdin:
+ sample stdin 1
+ sample stdin 0
+ preloaded output of a child
+ listening for stdin:
+ sample stdin 1
+ sample stdin 0
+ EOF
+
+ write_script stdin-script <<-\EOF &&
+ echo "listening for stdin:"
+ while read line
+ do
+ echo "$line"
+ done
+ EOF
+ test-tool run-command run-command-stdin 2 ./stdin-script 2>actual &&
+ test_cmp expect actual
+'
+
cat >expect <<-EOF
preloaded output of a child
asking for a quick stop
diff --git a/transport.c b/transport.c
index c7f06a7382..67368754bf 100644
--- a/transport.c
+++ b/transport.c
@@ -1316,65 +1316,56 @@ static void die_with_unpushed_submodules(struct string_list *needs_pushing)
die(_("Aborting."));
}
-static int run_pre_push_hook(struct transport *transport,
- struct ref *remote_refs)
+static int pre_push_hook_feed_stdin(int hook_stdin_fd, void *pp_cb, void *pp_task_cb UNUSED)
{
- int ret = 0, x;
- struct ref *r;
- struct child_process proc = CHILD_PROCESS_INIT;
- struct strbuf buf;
- const char *hook_path = find_hook(the_repository, "pre-push");
+ struct hook_cb_data *hook_cb = pp_cb;
+ struct ref *r = hook_cb->options->feed_pipe_ctx;
+ struct strbuf *buf = hook_cb->options->feed_pipe_cb_data;
+ int ret = 0;
- if (!hook_path)
- return 0;
+ if (!r)
+ return 1; /* no more refs */
- strvec_push(&proc.args, hook_path);
- strvec_push(&proc.args, transport->remote->name);
- strvec_push(&proc.args, transport->url);
+ if (!buf)
+ BUG("pipe_task_cb must contain a valid strbuf");
- proc.in = -1;
- proc.trace2_hook_name = "pre-push";
+ hook_cb->options->feed_pipe_ctx = r->next;
+ strbuf_reset(buf);
- if (start_command(&proc)) {
- finish_command(&proc);
- return -1;
- }
-
- sigchain_push(SIGPIPE, SIG_IGN);
+ if (!r->peer_ref) return 0;
+ if (r->status == REF_STATUS_REJECT_NONFASTFORWARD) return 0;
+ if (r->status == REF_STATUS_REJECT_STALE) return 0;
+ if (r->status == REF_STATUS_REJECT_REMOTE_UPDATED) return 0;
+ if (r->status == REF_STATUS_UPTODATE) return 0;
- strbuf_init(&buf, 256);
+ strbuf_addf(buf, "%s %s %s %s\n",
+ r->peer_ref->name, oid_to_hex(&r->new_oid),
+ r->name, oid_to_hex(&r->old_oid));
- for (r = remote_refs; r; r = r->next) {
- if (!r->peer_ref) continue;
- if (r->status == REF_STATUS_REJECT_NONFASTFORWARD) continue;
- if (r->status == REF_STATUS_REJECT_STALE) continue;
- if (r->status == REF_STATUS_REJECT_REMOTE_UPDATED) continue;
- if (r->status == REF_STATUS_UPTODATE) continue;
+ ret = write_in_full(hook_stdin_fd, buf->buf, buf->len);
+ if (ret < 0 && errno != EPIPE)
+ return ret; /* We do not mind if a hook does not read all refs. */
- strbuf_reset(&buf);
- strbuf_addf( &buf, "%s %s %s %s\n",
- r->peer_ref->name, oid_to_hex(&r->new_oid),
- r->name, oid_to_hex(&r->old_oid));
+ return 0;
+}
- if (write_in_full(proc.in, buf.buf, buf.len) < 0) {
- /* We do not mind if a hook does not read all refs. */
- if (errno != EPIPE)
- ret = -1;
- break;
- }
- }
+static int run_pre_push_hook(struct transport *transport,
+ struct ref *remote_refs)
+{
+ struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
+ struct strbuf buf = STRBUF_INIT;
+ int ret = 0;
- strbuf_release(&buf);
+ strvec_push(&opt.args, transport->remote->name);
+ strvec_push(&opt.args, transport->url);
- x = close(proc.in);
- if (!ret)
- ret = x;
+ opt.feed_pipe = pre_push_hook_feed_stdin;
+ opt.feed_pipe_ctx = remote_refs;
+ opt.feed_pipe_cb_data = &buf;
- sigchain_pop(SIGPIPE);
+ ret = run_hooks_opt(the_repository, "pre-push", &opt);
- x = finish_command(&proc);
- if (!ret)
- ret = x;
+ strbuf_release(&buf);
return ret;
}