Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#2029426) Rework PID change handling in journald #273

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions man/systemd.journal-fields.xml
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,16 @@
<varlistentry>
<term><varname>_LINE_BREAK=</varname></term>
<listitem>
<para>Only applies to <literal>_TRANSPORT=stdout</literal> records: indicates that the log message in the
standard output/error stream was not terminated with a normal newline character (<literal>\n</literal>,
i.e. ASCII 10). Specifically, when set this field is one of <option>nul</option> (in case the line was
terminated by a NUL byte), <option>line-max</option> (in case the maximum log line length was reached, as
configured with <varname>LineMax=</varname> in
<citerefentry><refentrytitle>journald.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>) or
<option>eof</option> (if this was the last log record of a stream and the stream ended without a final
newline character). Note that this record is not generated when a normal newline character was used for
marking the log line end.</para>
<para>Only applies to <literal>_TRANSPORT=stdout</literal> records: indicates that the log message
in the standard output/error stream was not terminated with a normal newline character
(<literal>\n</literal>, i.e. ASCII 10). Specifically, when set this field is one of
<option>nul</option> (in case the line was terminated by a NUL byte), <option>line-max</option> (in
case the maximum log line length was reached, as configured with <varname>LineMax=</varname> in
<citerefentry><refentrytitle>journald.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>),
<option>eof</option> (if this was the last log record of a stream and the stream ended without a
final newline character), or <option>pid-change</option> (if the process which generated the log
output changed in the middle of a line). Note that this record is not generated when a normal
newline character was used for marking the log line end.</para>
</listitem>
</varlistentry>
</variablelist>
Expand Down
135 changes: 89 additions & 46 deletions src/journal/journald-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ typedef enum LineBreak {
LINE_BREAK_NUL,
LINE_BREAK_LINE_MAX,
LINE_BREAK_EOF,
LINE_BREAK_PID_CHANGE,
_LINE_BREAK_MAX,
_LINE_BREAK_INVALID = -1,
} LineBreak;

struct StdoutStream {
Expand Down Expand Up @@ -233,7 +236,11 @@ static int stdout_stream_save(StdoutStream *s) {
return log_error_errno(r, "Failed to save stream data %s: %m", s->state_file);
}

static int stdout_stream_log(StdoutStream *s, const char *p, LineBreak line_break) {
static int stdout_stream_log(
StdoutStream *s,
const char *p,
LineBreak line_break) {

struct iovec *iovec;
int priority;
char syslog_priority[] = "PRIORITY=\0";
Expand All @@ -245,6 +252,9 @@ static int stdout_stream_log(StdoutStream *s, const char *p, LineBreak line_brea
assert(s);
assert(p);

assert(line_break >= 0);
assert(line_break < _LINE_BREAK_MAX);

if (s->context)
(void) client_context_maybe_refresh(s->server, s->context, NULL, NULL, 0, NULL, USEC_INFINITY);
else if (pid_is_valid(s->ucred.pid)) {
Expand Down Expand Up @@ -296,17 +306,20 @@ static int stdout_stream_log(StdoutStream *s, const char *p, LineBreak line_brea
iovec[n++] = IOVEC_MAKE_STRING(syslog_identifier);
}

if (line_break != LINE_BREAK_NEWLINE) {
const char *c;
static const char * const line_break_field_table[_LINE_BREAK_MAX] = {
[LINE_BREAK_NEWLINE] = NULL, /* Do not add field if traditional newline */
[LINE_BREAK_NUL] = "_LINE_BREAK=nul",
[LINE_BREAK_LINE_MAX] = "_LINE_BREAK=line-max",
[LINE_BREAK_EOF] = "_LINE_BREAK=eof",
[LINE_BREAK_PID_CHANGE] = "_LINE_BREAK=pid-change",
};

/* If this log message was generated due to an uncommon line break then mention this in the log
* entry */
const char *c = line_break_field_table[line_break];

c = line_break == LINE_BREAK_NUL ? "_LINE_BREAK=nul" :
line_break == LINE_BREAK_LINE_MAX ? "_LINE_BREAK=line-max" :
"_LINE_BREAK=eof";
/* If this log message was generated due to an uncommon line break then mention this in the log
* entry */
if (c)
iovec[n++] = IOVEC_MAKE_STRING(c);
}

message = strappend("MESSAGE=", p);
if (message)
Expand Down Expand Up @@ -420,77 +433,97 @@ static int stdout_stream_line(StdoutStream *s, char *p, LineBreak line_break) {
assert_not_reached("Unknown stream state");
}

static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
char *p;
size_t remaining;
static int stdout_stream_found(
StdoutStream *s,
char *p,
size_t l,
LineBreak line_break) {

char saved;
int r;

assert(s);
assert(p);

p = s->buffer;
remaining = s->length;
/* Let's NUL terminate the specified buffer for this call, and revert back afterwards */
saved = p[l];
p[l] = 0;
r = stdout_stream_line(s, p, line_break);
p[l] = saved;

/* XXX: This function does nothing if (s->length == 0) */
return r;
}

static int stdout_stream_scan(
StdoutStream *s,
char *p,
size_t remaining,
LineBreak force_flush,
size_t *ret_consumed) {

size_t consumed = 0;
int r;

assert(s);
assert(p);

for (;;) {
LineBreak line_break;
size_t skip;
size_t skip, found;
char *end1, *end2;

end1 = memchr(p, '\n', remaining);
end2 = memchr(p, 0, end1 ? (size_t) (end1 - p) : remaining);

if (end2) {
/* We found a NUL terminator */
skip = end2 - p + 1;
found = end2 - p;
skip = found + 1;
line_break = LINE_BREAK_NUL;
} else if (end1) {
/* We found a \n terminator */
*end1 = 0;
skip = end1 - p + 1;
found = end1 - p;
skip = found + 1;
line_break = LINE_BREAK_NEWLINE;
} else if (remaining >= s->server->line_max) {
/* Force a line break after the maximum line length */
*(p + s->server->line_max) = 0;
skip = remaining;
found = skip = s->server->line_max;
line_break = LINE_BREAK_LINE_MAX;
} else
break;

r = stdout_stream_line(s, p, line_break);
r = stdout_stream_found(s, p, found, line_break);
if (r < 0)
return r;

remaining -= skip;
p += skip;
consumed += skip;
remaining -= skip;
}

if (force_flush && remaining > 0) {
p[remaining] = 0;
r = stdout_stream_line(s, p, LINE_BREAK_EOF);
if (force_flush >= 0 && remaining > 0) {
r = stdout_stream_found(s, p, remaining, force_flush);
if (r < 0)
return r;

p += remaining;
remaining = 0;
consumed += remaining;
}

if (p > s->buffer) {
memmove(s->buffer, p, remaining);
s->length = remaining;
}
if (ret_consumed)
*ret_consumed = consumed;

return 0;
}

static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
uint8_t buf[CMSG_SPACE(sizeof(struct ucred))];
StdoutStream *s = userdata;
size_t limit, consumed;
struct ucred *ucred = NULL;
struct cmsghdr *cmsg;
struct iovec iovec;
size_t limit;
ssize_t l;
char *p;
int r;

struct msghdr msghdr = {
Expand All @@ -507,8 +540,8 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
goto terminate;
}

/* If the buffer is full already (discounting the extra NUL we need), add room for another 1K */
if (s->length + 1 >= s->allocated) {
/* If the buffer is almost full, add room for another 1K */
if (s->length + 512 >= s->allocated) {
if (!GREEDY_REALLOC(s->buffer, s->allocated, s->length + 1 + 1024)) {
log_oom();
goto terminate;
Expand All @@ -518,7 +551,7 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
/* Try to make use of the allocated buffer in full, but never read more than the configured line size. Also,
* always leave room for a terminating NUL we might need to add. */
limit = MIN(s->allocated - 1, s->server->line_max);

assert(s->length <= limit);
iovec = IOVEC_MAKE(s->buffer + s->length, limit - s->length);

l = recvmsg(s->fd, &msghdr, MSG_DONTWAIT|MSG_CMSG_CLOEXEC);
Expand All @@ -532,7 +565,7 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
cmsg_close_all(&msghdr);

if (l == 0) {
stdout_stream_scan(s, true);
(void) stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_EOF, NULL);
goto terminate;
}

Expand All @@ -551,23 +584,33 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
* in the meantime.
*/
if (ucred && ucred->pid != s->ucred.pid) {
/* force out any previously half-written lines from a
* different process, before we switch to the new ucred
* structure for everything we just added */
r = stdout_stream_scan(s, true);
/* Force out any previously half-written lines from a different process, before we switch to
* the new ucred structure for everything we just added */
r = stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_PID_CHANGE, NULL);
if (r < 0)
goto terminate;

s->ucred = *ucred;
client_context_release(s->server, s->context);
s->context = NULL;
s->context = client_context_release(s->server, s->context);

p = s->buffer + s->length;
} else {
p = s->buffer;
l += s->length;
}

s->length += l;
r = stdout_stream_scan(s, false);
/* Always copy in the new credentials */
if (ucred)
s->ucred = *ucred;

r = stdout_stream_scan(s, p, l, _LINE_BREAK_INVALID, &consumed);
if (r < 0)
goto terminate;

/* Move what wasn't consumed to the front of the buffer */
assert(consumed <= (size_t) l);
s->length = l - consumed;
memmove(s->buffer, p + consumed, s->length);

return 1;

terminate:
Expand Down
15 changes: 15 additions & 0 deletions test/TEST-04-JOURNAL/test-journal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ journalctl -b -o export -t "$ID" --output-fields=_PID | grep '^_PID=' >/output
grep -q "^_PID=$PID" /output
grep -vq "^_PID=$PID" /output

# https://github.com/systemd/systemd/issues/15654
ID=$(journalctl --new-id128 | sed -n 2p)
printf "This will\nusually fail\nand be truncated\n">/expected
systemd-cat -t "$ID" /bin/sh -c 'env echo -n "This will";echo;env echo -n "usually fail";echo;env echo -n "and be truncated";echo;'
journalctl --sync
journalctl -b -o cat -t "$ID" >/output
cmp /expected /output
[[ $(journalctl -b -o export -t "$ID" --output-fields=_TRANSPORT | grep -Pc "^_TRANSPORT=stdout$") -eq 3 ]]
[[ $(journalctl -b -o export -t "$ID" --output-fields=_LINE_BREAK | grep -Pc "^_LINE_BREAK=pid-change$") -eq 3 ]]
[[ $(journalctl -b -o export -t "$ID" --output-fields=_PID | sort -u | grep -c "^_PID=.*$") -eq 3 ]]
[[ $(journalctl -b -o export -t "$ID" --output-fields=MESSAGE | grep -Pc "^MESSAGE=(This will|usually fail|and be truncated)$") -eq 3 ]]

# Add new tests before here, the journald restarts below
# may make tests flappy.

# Don't lose streams on restart
systemctl start forever-print-hola
sleep 3
Expand Down