blob: eff832515dace062ad0b3c17684d04e2368dde0a [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
#define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
#include <grpc/support/port_platform.h>
#include <grpc/slice_buffer.h>
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h"
/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
* compression for the message */
#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
/** Mask of all valid internal flags. */
#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
namespace grpc_core {
class ByteStream : public Orphanable {
public:
virtual ~ByteStream() {}
// Returns true if the bytes are available immediately (in which case
// on_complete will not be called), or false if the bytes will be available
// asynchronously (in which case on_complete will be called when they
// are available).
//
// max_size_hint can be set as a hint as to the maximum number
// of bytes that would be acceptable to read.
virtual bool Next(size_t max_size_hint,
grpc_closure* on_complete) GRPC_ABSTRACT;
// Returns the next slice in the byte stream when it is available, as
// indicated by Next().
//
// Once a slice is returned into *slice, it is owned by the caller.
virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT;
// Shuts down the byte stream.
//
// If there is a pending call to on_complete from Next(), it will be
// invoked with the error passed to Shutdown().
//
// The next call to Pull() (if any) will return the error passed to
// Shutdown().
virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT;
uint32_t length() const { return length_; }
uint32_t flags() const { return flags_; }
void set_flags(uint32_t flags) { flags_ = flags; }
GRPC_ABSTRACT_BASE_CLASS
protected:
ByteStream(uint32_t length, uint32_t flags)
: length_(length), flags_(flags) {}
private:
const uint32_t length_;
uint32_t flags_;
};
//
// SliceBufferByteStream
//
// A ByteStream that wraps a slice buffer.
//
class SliceBufferByteStream : public ByteStream {
public:
// Removes all slices in slice_buffer, leaving it empty.
SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags);
~SliceBufferByteStream();
void Orphan() override;
bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
grpc_error* Pull(grpc_slice* slice) override;
void Shutdown(grpc_error* error) override;
private:
grpc_slice_buffer backing_buffer_;
size_t cursor_ = 0;
grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
};
//
// CachingByteStream
//
// A ByteStream that that wraps an underlying byte stream but caches
// the resulting slices in a slice buffer. If an initial attempt fails
// without fully draining the underlying stream, a new caching stream
// can be created from the same underlying cache, in which case it will
// return whatever is in the backing buffer before continuing to read the
// underlying stream.
//
// NOTE: No synchronization is done, so it is not safe to have multiple
// CachingByteStreams simultaneously drawing from the same underlying
// ByteStreamCache at the same time.
//
class ByteStreamCache {
public:
class CachingByteStream : public ByteStream {
public:
explicit CachingByteStream(ByteStreamCache* cache);
~CachingByteStream();
void Orphan() override;
bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
grpc_error* Pull(grpc_slice* slice) override;
void Shutdown(grpc_error* error) override;
// Resets the byte stream to the start of the underlying stream.
void Reset();
private:
ByteStreamCache* cache_;
size_t cursor_ = 0;
size_t offset_ = 0;
grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
};
explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream);
~ByteStreamCache();
// Must not be destroyed while still in use by a CachingByteStream.
void Destroy();
grpc_slice_buffer* cache_buffer() { return &cache_buffer_; }
private:
OrphanablePtr<ByteStream> underlying_stream_;
uint32_t length_;
uint32_t flags_;
grpc_slice_buffer cache_buffer_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */