blob: 051f218515a04d1d900457e6abbcc3872929def8 [file] [log] [blame]
// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.util.io;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import java.io.IOException;
import java.io.OutputStream;
/**
* The dual of {@link StreamMultiplexer}: This is an output stream into which
* you can dump the multiplexed stream, and it delegates the de-multiplexed
* content back into separate channels (instances of {@link OutputStream}).
*
* The format of the tagged output stream is as follows:
*
* <pre>
* combined :: = [ control_line payload ... ]+
* control_line :: = '@' marker '@'? '\n'
* payload :: = r'^[^\n]*\n'
* </pre>
*
* For more details, please see {@link StreamMultiplexer}.
*/
@ThreadCompatible
public final class StreamDemultiplexer extends OutputStream {
@Override
public void close() throws IOException {
flush();
}
@Override
public void flush() throws IOException {
if (selectedStream != null) {
selectedStream.flush();
}
}
private static final byte AT = '@';
private static final byte NEWLINE = '\n';
/**
* The output streams, conveniently in an array indexed by the marker byte.
* Some of these will be null, most likely.
*/
private final OutputStream[] outputStreams =
new OutputStream[Byte.MAX_VALUE + 1];
/**
* Each state in this FSM corresponds to a position in the grammar, which is
* simple enough that we can just move through it from beginning to end as we
* parse things.
*/
private enum State {
EXPECT_CONTROL_STARTING_AT,
EXPECT_MARKER_BYTE,
EXPECT_AT_OR_NEWLINE,
EXPECT_PAYLOAD_OR_NEWLINE
}
private State state = State.EXPECT_CONTROL_STARTING_AT;
private boolean addNewlineToPayload;
private OutputStream selectedStream;
/**
* Construct a new demultiplexer. The {@code smallestMarkerByte} indicates
* the marker byte we would expect for {@code outputStreams[0]} to be used.
* So, if this first stream is your stdout and you're using the
* {@link StreamMultiplexer}, then you will need to set this to
* {@code '1'}. Because {@link StreamDemultiplexer} extends
* {@link OutputStream}, this constructor effectively creates an
* {@link OutputStream} instance which demultiplexes the tagged data client
* code writes to it into {@code outputStreams}.
*/
public StreamDemultiplexer(byte smallestMarkerByte,
OutputStream... outputStreams) {
for (int i = 0; i < outputStreams.length; i++) {
this.outputStreams[smallestMarkerByte + i] = outputStreams[i];
}
}
@Override
public void write(int b) throws IOException {
// This dispatch traverses the finite state machine / grammar.
switch (state) {
case EXPECT_CONTROL_STARTING_AT:
parseControlStartingAt((byte) b);
resetFields();
break;
case EXPECT_MARKER_BYTE:
parseMarkerByte((byte) b);
break;
case EXPECT_AT_OR_NEWLINE:
parseAtOrNewline((byte) b);
break;
case EXPECT_PAYLOAD_OR_NEWLINE:
parsePayloadOrNewline((byte) b);
break;
}
}
/**
* Handles {@link State#EXPECT_PAYLOAD_OR_NEWLINE}, which is the payload
* we are actually transporting over the wire. At this point we can rely
* on a stream having been preselected into {@link #selectedStream}, and
* also we will add a newline if {@link #addNewlineToPayload} is set.
* Flushes at the end of every payload segment.
*/
private void parsePayloadOrNewline(byte b) throws IOException {
if (b == NEWLINE) {
if (addNewlineToPayload) {
selectedStream.write(NEWLINE);
}
selectedStream.flush();
state = State.EXPECT_CONTROL_STARTING_AT;
} else {
selectedStream.write(b);
selectedStream.flush(); // slow?
}
}
/**
* Handles {@link State#EXPECT_AT_OR_NEWLINE}, which is either the
* suppress newline indicator (at) at the end of a control line, or the end
* of a control line.
*/
private void parseAtOrNewline(byte b) throws IOException {
if (b == NEWLINE) {
state = State.EXPECT_PAYLOAD_OR_NEWLINE;
} else if (b == AT) {
addNewlineToPayload = false;
} else {
throw new IOException("Expected @ or \\n. (" + b + ")");
}
}
/**
* Reset the fields that are affected by our state.
*/
private void resetFields() {
selectedStream = null;
addNewlineToPayload = true;
}
/**
* Handles {@link State#EXPECT_MARKER_BYTE}. The byte determines which stream
* we will be using, and will set {@link #selectedStream}.
*/
private void parseMarkerByte(byte markerByte) throws IOException {
if (markerByte < 0 || markerByte > Byte.MAX_VALUE) {
String msg = "Illegal marker byte (" + markerByte + ")";
throw new IllegalArgumentException(msg);
}
if (markerByte > outputStreams.length
|| outputStreams[markerByte] == null) {
throw new IOException("stream " + markerByte + " not registered.");
}
selectedStream = outputStreams[markerByte];
state = State.EXPECT_AT_OR_NEWLINE;
}
/**
* Handles {@link State#EXPECT_CONTROL_STARTING_AT}, the very first '@' with
* which each message starts.
*/
private void parseControlStartingAt(byte b) throws IOException {
if (b != AT) {
throw new IOException("Expected control starting @. (" + b + ", "
+ (char) b + ")");
}
state = State.EXPECT_MARKER_BYTE;
}
}