Skip to content

Commit

Permalink
journald: rework pid change handling
Browse files Browse the repository at this point in the history
Let's introduce an explicit line ending marker for line endings due to
pid change.

Let's also make sure we don't get confused with buffer management.

Fixes: #15654
(cherry picked from commit 45ba1ea)

Resolves: #2029426
  • Loading branch information
poettering authored and systemd-rhel-bot committed Feb 27, 2023
1 parent cd85a65 commit 538bd9b
Showing 1 changed file with 66 additions and 33 deletions.
99 changes: 66 additions & 33 deletions src/journal/journald-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef enum LineBreak {
LINE_BREAK_NUL,
LINE_BREAK_LINE_MAX,
LINE_BREAK_EOF,
LINE_BREAK_PID_CHANGE,
_LINE_BREAK_MAX,
_LINE_BREAK_INVALID = -1,
} LineBreak;
Expand Down Expand Up @@ -310,6 +311,7 @@ static int stdout_stream_log(
[LINE_BREAK_NUL] = "_LINE_BREAK=nul",
[LINE_BREAK_LINE_MAX] = "_LINE_BREAK=line-max",
[LINE_BREAK_EOF] = "_LINE_BREAK=eof",
[LINE_BREAK_PID_CHANGE] = "_LINE_BREAK=pid-change",
};

const char *c = line_break_field_table[line_break];
Expand Down Expand Up @@ -431,77 +433,97 @@ static int stdout_stream_line(StdoutStream *s, char *p, LineBreak line_break) {
assert_not_reached("Unknown stream state");
}

static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
char *p;
size_t remaining;
static int stdout_stream_found(
StdoutStream *s,
char *p,
size_t l,
LineBreak line_break) {

char saved;
int r;

assert(s);
assert(p);

/* Let's NUL terminate the specified buffer for this call, and revert back afterwards */
saved = p[l];
p[l] = 0;
r = stdout_stream_line(s, p, line_break);
p[l] = saved;

p = s->buffer;
remaining = s->length;
return r;
}

static int stdout_stream_scan(
StdoutStream *s,
char *p,
size_t remaining,
LineBreak force_flush,
size_t *ret_consumed) {

/* XXX: This function does nothing if (s->length == 0) */
size_t consumed = 0;
int r;

assert(s);
assert(p);

for (;;) {
LineBreak line_break;
size_t skip;
size_t skip, found;
char *end1, *end2;

end1 = memchr(p, '\n', remaining);
end2 = memchr(p, 0, end1 ? (size_t) (end1 - p) : remaining);

if (end2) {
/* We found a NUL terminator */
skip = end2 - p + 1;
found = end2 - p;
skip = found + 1;
line_break = LINE_BREAK_NUL;
} else if (end1) {
/* We found a \n terminator */
*end1 = 0;
skip = end1 - p + 1;
found = end1 - p;
skip = found + 1;
line_break = LINE_BREAK_NEWLINE;
} else if (remaining >= s->server->line_max) {
/* Force a line break after the maximum line length */
*(p + s->server->line_max) = 0;
skip = remaining;
found = skip = s->server->line_max;
line_break = LINE_BREAK_LINE_MAX;
} else
break;

r = stdout_stream_line(s, p, line_break);
r = stdout_stream_found(s, p, found, line_break);
if (r < 0)
return r;

remaining -= skip;
p += skip;
consumed += skip;
remaining -= skip;
}

if (force_flush && remaining > 0) {
p[remaining] = 0;
r = stdout_stream_line(s, p, LINE_BREAK_EOF);
if (force_flush >= 0 && remaining > 0) {
r = stdout_stream_found(s, p, remaining, force_flush);
if (r < 0)
return r;

p += remaining;
remaining = 0;
consumed += remaining;
}

if (p > s->buffer) {
memmove(s->buffer, p, remaining);
s->length = remaining;
}
if (ret_consumed)
*ret_consumed = consumed;

return 0;
}

static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
uint8_t buf[CMSG_SPACE(sizeof(struct ucred))];
StdoutStream *s = userdata;
size_t limit, consumed;
struct ucred *ucred = NULL;
struct cmsghdr *cmsg;
struct iovec iovec;
size_t limit;
ssize_t l;
char *p;
int r;

struct msghdr msghdr = {
Expand Down Expand Up @@ -529,7 +551,7 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
/* Try to make use of the allocated buffer in full, but never read more than the configured line size. Also,
* always leave room for a terminating NUL we might need to add. */
limit = MIN(s->allocated - 1, s->server->line_max);

assert(s->length <= limit);
iovec = IOVEC_MAKE(s->buffer + s->length, limit - s->length);

l = recvmsg(s->fd, &msghdr, MSG_DONTWAIT|MSG_CMSG_CLOEXEC);
Expand All @@ -543,7 +565,7 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
cmsg_close_all(&msghdr);

if (l == 0) {
stdout_stream_scan(s, true);
(void) stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_EOF, NULL);
goto terminate;
}

Expand All @@ -562,22 +584,33 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
* in the meantime.
*/
if (ucred && ucred->pid != s->ucred.pid) {
/* force out any previously half-written lines from a
* different process, before we switch to the new ucred
* structure for everything we just added */
r = stdout_stream_scan(s, true);
/* Force out any previously half-written lines from a different process, before we switch to
* the new ucred structure for everything we just added */
r = stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_PID_CHANGE, NULL);
if (r < 0)
goto terminate;

s->ucred = *ucred;
s->context = client_context_release(s->server, s->context);

p = s->buffer + s->length;
} else {
p = s->buffer;
l += s->length;
}

s->length += l;
r = stdout_stream_scan(s, false);
/* Always copy in the new credentials */
if (ucred)
s->ucred = *ucred;

r = stdout_stream_scan(s, p, l, _LINE_BREAK_INVALID, &consumed);
if (r < 0)
goto terminate;

/* Move what wasn't consumed to the front of the buffer */
assert(consumed <= (size_t) l);
s->length = l - consumed;
memmove(s->buffer, p + consumed, s->length);

return 1;

terminate:
Expand Down

0 comments on commit 538bd9b

Please sign in to comment.