C++缓冲区RingBuffer

Tcp缓存

通过tcp epoll非阻塞发送请求时,为了提高吞吐量,高性能发送请求,一般会做个发送缓存和接收缓存。即将发送的请求可以在发送缓存里聚合,接收到的响应在接收缓存里聚合,减少系统调用次数。

算法

将一个大的buffer,通过begin和end两个标记连成环状

  • 如果end > begin,则它是一个连续的buffer有数据 [begin, end)
  • 如果end < begin,则它是两段连续的buffer有数据 [0, end) [begin, cap)
  • 如果end == begin,则整个buffer可能满,可能空,通过empty标记判断

C++代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class RingBuffer {
public:
RingBuffer() {}

~RingBuffer() { Fini(); }

/// @brief 释放内存
void Fini() {
if (buf_ != nullptr) {
delete[] buf_;
buf_ = nullptr;
}
begin_ = 0;
end_ = 0;
empty_ = true;
cap_ = 0;
}

/// @brief 初始化申请内存
/// @param min_cap 最小大小
/// @param max_cap 最大大小
/// @param logger 日志句柄
/// @return 错误码
Status Init(uint32_t min_cap, uint32_t max_cap, std::shared_ptr<Logger> logger);

/// @brief 获取当前buffer容量
/// @return 当前buffer容量
uint32_t GetCapSize() { return cap_; }

/// @brief 获取当前已使用大小
/// @return 已使用大小
uint32_t GetUsedSize() {
if (empty_) {
return 0;
}
if (end_ > begin_) {
return (end_ - begin_);
}
return (cap_ - begin_ + end_);
}

/// @brief 获取剩余可写入的数据大小容量
/// @return 可写入容量
uint32_t GetLeftSize() { return (cap_ - GetUsedSize()); }

/// @brief 清空数据,重置数据游标
void Clear() {
begin_ = 0;
end_ = 0;
empty_ = true;
}

/// @brief 是否有数据
/// @return bool
bool Empty() { return empty_; }

/// @brief 是否满
/// @return bool
bool Full() { return (GetUsedSize() == cap_); }

/// @brief 获取可写数据的buffer, 由于数据是可能是分段的,一个完整的buffer可能分为两段,所以用向量数组
/// @param iovc 向量数组,可写入的内存,外部传入释放
/// @param count 向量数组大小,入参要>=2
/// @return 错误码
Status GetLeftBuffer(struct iovec* iovc, uint32_t& count);

/// @brief 获取已使用的数据的buffer, 由于数据是可能是分段的,一个完整的buffer可能分为两段,所以用向量数组
/// @param iovc 向量数组,已使用的内存,外部传入释放
/// @param count 向量数组大小,入参要>=2
/// @return 错误码
Status GetUsedBuffer(struct iovec* iovc, uint32_t& count);

/// @brief 按偏移和大小读取向量数组中的buffer, 如果需要的内存大小跨段了,则拷贝到copy_buffer中;没有跨段则是zerocopy
/// @param offset 偏移
/// @param data_buffer_ 数据指针应用,指向真正的数据,内存本结构管理
/// @param size 需要的大小
/// @param copy_buffer 如果需要的内存大小跨段了,则拷贝到这个buffer中,内存外部释放
/// @return 错误码
Status ReadBuffer(uint32_t offset,
const char*& data_buffer_,
uint32_t size,
char* copy_buffer,
std::shared_ptr<Logger> logger = nullptr) {
if (size == 0) {
return {};
}
if (UNLIKELY(buf_ == nullptr)) {
RETURN_ERR_NO_LOG(ErrorCode_Code_kParamInvalid, "buf_ == nullptr");
}
if (UNLIKELY(copy_buffer == nullptr)) {
RETURN_ERR_NO_LOG(ErrorCode_Code_kParamInvalid, "copy_buffer == nullptr");
}
if (offset + size > GetUsedSize()) {
RETURN_ERR_NO_LOG(
ErrorCode_Code_kDecodeError, "offset {} + size {} > total_len_ {}", offset, size, GetUsedSize());
}
TRACE_LOG(logger, "read offset {} size {}", offset, size);

uint32_t begin_pos = (begin_ + offset) % cap_;
// 没有跨段
if (begin_pos + size <= cap_) {
data_buffer_ = buf_ + begin_pos;
return {};
}

// 跨段,拷贝读取
uint32_t cp_len = cap_ - begin_pos;
memcpy(copy_buffer, buf_ + begin_pos, cp_len);
memcpy(copy_buffer + cp_len, buf_, size - cp_len);
data_buffer_ = copy_buffer;
return {};
}

/// @brief 读取数据后,移动begin的数据游标
/// @param len 读取的数据大小
Status ShiftReadPos(uint32_t len) {
if (UNLIKELY(buf_ == nullptr)) {
RETURN_ERR_NO_LOG(ErrorCode_Code_kParamInvalid, "buf_ == nullptr");
}
if (UNLIKELY(len <= 0)) {
return {};
}
uint32_t used_size = GetUsedSize();
if (UNLIKELY(len > used_size)) {
RETURN_ERR_NO_LOG(ErrorCode_Code_kParamInvalid, "read len {} > GetUsedSize {}", len, used_size);
}

if (len == used_size) {
Clear();
return {};
}
begin_ += len;
if (begin_ >= cap_) {
begin_ -= cap_;
}
return {};
}

/// @brief 按偏移和大小写入数据
/// @param offset 偏移
/// @param data 数据指针,外部释放
/// @param size 数据的大小
/// @return 错误码
Status WriteBuffer(uint32_t offset, const char* data, uint32_t size) {
if (size == 0) {
return {};
}
if (UNLIKELY(buf_ == nullptr)) {
RETURN_ERR_NO_LOG(ErrorCode_Code_kParamInvalid, "buf_ == nullptr");
}
if (offset + size > GetLeftSize()) {
RETURN_ERR_NO_LOG(
ErrorCode_Code_kDecodeError, "offset {} + size {} > total_len_ {}", offset, size, GetLeftSize());
}
uint32_t end_pos = (end_ + offset) % cap_;
// 没有跨段
if (end_pos + size <= cap_) {
memcpy(buf_ + end_pos, data, size);
return {};
}

// 跨段
uint32_t cp_len = cap_ - end_pos;
memcpy(buf_ + end_pos, data, cp_len);
memcpy(buf_, data + cp_len, size - cp_len);
return {};
}

/// @brief 写入数据后,移动end的数据游标
/// @param len 写入数据的大小
Status ShiftWritePos(uint32_t len) {
if (UNLIKELY(buf_ == nullptr)) {
RETURN_ERR_NO_LOG(ErrorCode_Code_kParamInvalid, "buf_ == nullptr");
}
if (UNLIKELY(len <= 0)) {
return {};
}
if (UNLIKELY(len > GetLeftSize())) {
RETURN_ERR_NO_LOG(
ErrorCode_Code_kParamInvalid, "write len {} > GetLeftSize {}", len, GetLeftSize());
}

empty_ = false;
end_ += len;
if (end_ >= cap_) {
end_ -= cap_;
}
return {};
}

/// @brief 扩容ringbuffer
/// @param need_size 需要写入的数据大小,如果传入0,则按倍数扩
/// @return 错误码
Status Extend(uint32_t need_size = 0);

private:
char* buf_ = nullptr;
uint32_t cap_ = 0; // buffer大小
uint32_t min_cap_ = 0; // 最小容量
uint32_t max_cap_ = 0; // 最大容量,当写入数据大于cap时,自动扩容

uint32_t begin_ = 0; // 数据起始位置
uint32_t end_ = 0; // 数据结束位置
bool empty_ = true; // 当begin == end 时,通过empty_ 判断是空还是满

std::shared_ptr<Logger> logger_ = nullptr;
};

CPP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#include "ring_buffer.h"

Status RingBuffer::Init(uint32_t min_cap, uint32_t max_cap, std::shared_ptr<Logger> logger) {
logger_ = logger;
if (min_cap > max_cap) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "min_cap > max_cap");
}

if (min_cap <= 0) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "min_cap <= 0");
}
Fini();
min_cap_ = min_cap;
max_cap_ = max_cap;
buf_ = new char[min_cap_];
if (buf_ == nullptr) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "new buffer failed, size {}", min_cap_);
}
cap_ = min_cap_;
DEBUG_LOG(logger_, "min_cap_ {} max_cap_ {}", min_cap_, max_cap_);
return {};
}

Status RingBuffer::GetLeftBuffer(struct iovec* iovc, uint32_t& count) {
if (UNLIKELY(iovc == nullptr)) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "iovc == nullptr");
}
if (UNLIKELY(count < 2)) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "count < 2, iovc invalid");
}
if (UNLIKELY(buf_ == nullptr)) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "buf_ == nullptr");
}
if (UNLIKELY(Full())) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "buffer not enough");
}
if (UNLIKELY(end_ > cap_)) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "end_ > cap_, buffer invalid");
}
count = 0;

if (empty_) {
Clear();
iovc[0].iov_base = buf_;
iovc[0].iov_len = cap_;
count = 1;
return {};
}

// 一段
if (end_ <= begin_) {
iovc[0].iov_base = buf_ + end_;
iovc[0].iov_len = begin_ - end_;
count = 1;
return {};
}

// 一般不存在end_ == cap_;当end_ == cap_,重置end_=0
iovc[0].iov_base = buf_ + end_;
iovc[0].iov_len = cap_ - end_;
count = 1;
// 两段
if (begin_ != 0) {
iovc[1].iov_base = buf_;
iovc[1].iov_len = begin_;
count = 2;
}
return {};
}

Status RingBuffer::GetUsedBuffer(struct iovec* iovc, uint32_t& count) {
if (UNLIKELY(iovc == nullptr)) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "iovc == nullptr");
}
if (UNLIKELY(count < 2)) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "count < 2, iovc invalid");
}
if (UNLIKELY(buf_ == nullptr)) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "buf_ == nullptr");
}

count = 0;
if (empty_) {
return {};
}

// 一段
if (end_ > begin_) {
iovc[0].iov_base = buf_ + begin_;
iovc[0].iov_len = (end_ - begin_);
count = 1;
return {};
}

// 一般不存在begin_ == cap_;当begin_ == cap_,重置end_=0
iovc[0].iov_base = buf_ + begin_;
iovc[0].iov_len = (cap_ - begin_);
count = 1;
// 两段
if (end_ != 0) {
iovc[1].iov_base = buf_;
iovc[1].iov_len = end_;
count = 2;
}
return {};
}

Status RingBuffer::Extend(uint32_t need_size) {
if (UNLIKELY(buf_ == nullptr)) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "buf_ == nullptr");
}
if (UNLIKELY(need_size < 0)) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "need_size {} < 0", need_size);
}

uint32_t new_cap = 0;
if (need_size == 0) {
if (cap_ >= max_cap_) {
return {};
}
// 自动扩容,翻倍
new_cap = 2 * cap_;
if (new_cap > max_cap_) {
new_cap = max_cap_;
}
} else if (need_size > 0) {
// 指定大小扩容
uint32_t left_size = GetLeftSize();
if (left_size >= need_size) {
return {};
}
new_cap = cap_ + (need_size - left_size);
if (new_cap > max_cap_) {
RETURN_ERR(logger_,
ErrorCode_Code_kParamInvalid,
"ExtendCap failed new_cap {} > max_cap_ {} left size {} need size {}",
new_cap,
max_cap_,
left_size,
need_size);
}
}

DEBUG_LOG(logger_, "extend buffer need_size {} left_size {} from {} to {}", need_size, GetLeftSize(), cap_, new_cap);
// extend
char* new_buff = new char[new_cap];
if (new_buff == nullptr) {
RETURN_ERR(logger_, ErrorCode_Code_kParamInvalid, "new buffer failed, size {}", new_cap);
}

if (empty_) {
delete[] buf_;
buf_ = new_buff;
cap_ = new_cap;
Clear();
return {};
}

// 一段
if (end_ > begin_) {
memcpy(new_buff, buf_ + begin_, (end_ - begin_));
delete[] buf_;
buf_ = new_buff;
cap_ = new_cap;
begin_ = 0;
end_ = (end_ - begin_);
return {};
}

// 两段
uint32_t pos = 0;
memcpy(new_buff, buf_ + begin_, (cap_ - begin_));
pos += (cap_ - begin_);
if (end_ != 0) {
memcpy(new_buff + pos, buf_, end_);
pos += end_;
}
delete[] buf_;
buf_ = new_buff;
cap_ = new_cap;
begin_ = 0;
end_ = pos;
return {};
}

错误状态Status

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class Status {
public:
Status() = default;

Status(const Status& status) = default;

Status& operator=(const Status& status) = default;

Status(Status&& status) noexcept;

Status(int32_t code, const std::string& message) : code_(code), message_(message) {}

Status(int32_t code, std::string&& message) : code_(code), message_(std::move(message)) {}

// 其他库也返回类似的Status对象,如trpc::Status
// 这里将其他Status对象转换为tcaplus::Status
template <class OtherStatus>
static Status Cast(OtherStatus& other);

// 返回错误码
[[nodiscard]] int32_t Code() const { return this->code_; }

// 返回错误消息
[[nodiscard]] const std::string& Message() const { return this->message_; }

// 返回是否成功
[[nodiscard]] bool Ok() const { return this->code_ == 0; }

// 返回是否成功
[[nodiscard]] bool OK() const { return this->code_ == 0; }

// 返回是否成功
[[nodiscard]] bool NotOk() const { return this->code_ != 0; }

// 返回是否成功
[[nodiscard]] bool NotOK() const { return this->code_ != 0; }

// 设置是否能打印日志
void SetPrintable(bool printable) { printable_ = printable; }

// 返回是否能打印日志
[[nodiscard]] bool Printable() const { return printable_; }

[[nodiscard]] std::string ToString() const;

private:
int32_t code_ = 0; // 状态码
bool printable_ = false; // 是否打印日志
std::string message_; // 附加信息
};

template <class OtherStatus>
Status Status::Cast(OtherStatus& other) {
Status st;
if (other.OK()) {
return std::move(st);
}

if (other.GetFrameworkRetCode() != 0) {
st.code_ = other.GetFrameworkRetCode();
} else {
st.code_ = other.GetFuncRetCode();
}

const_cast<std::string&>(other.ErrorMessage()).swap(st.message_);
return std::move(st);
}

Status::Status(Status&& status) noexcept {
code_ = status.code_;
printable_ = status.printable_;
message_ = std::move(status.message_);
}

std::string Status::ToString() const {
std::stringstream string_stream;
string_stream << "[code_=" << this->code_ << ", "
<< "err_msg_=" << this->message_ << "]";
return string_stream.str();
}

C++缓冲区RingBuffer
http://example.com/2024/04/11/C-缓冲区RingBuffer/
作者
Liu XinWei
发布于
2024年4月11日
许可协议