Scorum
block_log.cpp
Go to the documentation of this file.
2 #include <fstream>
3 #include <fc/io/raw.hpp>
4 
5 #define LOG_READ (std::ios::in | std::ios::binary)
6 #define LOG_WRITE (std::ios::out | std::ios::binary | std::ios::app)
7 
8 namespace scorum {
9 namespace chain {
10 
11 namespace detail {
13 {
14 public:
15  optional<signed_block> head;
17  std::fstream block_stream;
18  std::fstream index_stream;
19  fc::path block_file;
20  fc::path index_file;
23 
24  inline void check_block_read()
25  {
26  try
27  {
28  if (block_write)
29  {
30  block_stream.close();
31  block_stream.open(block_file.generic_string().c_str(), LOG_READ);
32  block_write = false;
33  }
34  }
35  FC_LOG_AND_RETHROW()
36  }
37 
38  inline void check_block_write()
39  {
40  try
41  {
42  if (!block_write)
43  {
44  block_stream.close();
45  block_stream.open(block_file.generic_string().c_str(), LOG_WRITE);
46  block_write = true;
47  }
48  }
49  FC_LOG_AND_RETHROW()
50  }
51 
52  inline void check_index_read()
53  {
54  try
55  {
56  if (index_write)
57  {
58  index_stream.close();
59  index_stream.open(index_file.generic_string().c_str(), LOG_READ);
60  index_write = false;
61  }
62  }
63  FC_LOG_AND_RETHROW()
64  }
65 
66  inline void check_index_write()
67  {
68  try
69  {
70  if (!index_write)
71  {
72  index_stream.close();
73  index_stream.open(index_file.generic_string().c_str(), LOG_WRITE);
74  index_write = true;
75  }
76  }
77  FC_LOG_AND_RETHROW()
78  }
79 };
80 }
81 
83  : my(new detail::block_log_impl())
84 {
85  my->block_stream.exceptions(std::fstream::failbit | std::fstream::badbit);
86  my->index_stream.exceptions(std::fstream::failbit | std::fstream::badbit);
87 }
88 
90 {
91  flush();
92 }
93 
94 void block_log::open(const fc::path& file)
95 {
96  if (my->block_stream.is_open())
97  my->block_stream.close();
98  if (my->index_stream.is_open())
99  my->index_stream.close();
100 
101  my->block_file = file;
102  my->index_file = block_log_index_path(file);
103 
104  my->block_stream.open(my->block_file.generic_string().c_str(), LOG_WRITE);
105  my->index_stream.open(my->index_file.generic_string().c_str(), LOG_WRITE);
106  my->block_write = true;
107  my->index_write = true;
108 
109  /* On startup of the block log, there are several states the log file and the index file can be
110  * in relation to eachother.
111  *
112  * Block Log
113  * Exists Is New
114  * +------------+------------+
115  * Exists | Check | Delete |
116  * Index | Head | Index |
117  * File +------------+------------+
118  * Is New | Replay | Do |
119  * | Log | Nothing |
120  * +------------+------------+
121  *
122  * Checking the heads of the files has several conditions as well.
123  * - If they are the same, do nothing.
124  * - If the index file head is not in the log file, delete the index and replay.
125  * - If the index file head is in the log, but not up to date, replay from index head.
126  */
127  auto log_size = fc::file_size(my->block_file);
128  auto index_size = fc::file_size(my->index_file);
129 
130  if (log_size)
131  {
132  ilog("Log is nonempty");
133  my->head = read_head();
134  my->head_id = my->head->id();
135 
136  if (index_size)
137  {
138  my->check_block_read();
139  my->check_index_read();
140 
141  ilog("Index is nonempty");
142  uint64_t block_pos;
143  my->block_stream.seekg(-sizeof(uint64_t), std::ios::end);
144  my->block_stream.read((char*)&block_pos, sizeof(block_pos));
145 
146  uint64_t index_pos;
147  my->index_stream.seekg(-sizeof(uint64_t), std::ios::end);
148  my->index_stream.read((char*)&index_pos, sizeof(index_pos));
149 
150  if (block_pos < index_pos)
151  {
152  ilog("block_pos < index_pos, close and reopen index_stream");
153  construct_index();
154  }
155  else if (block_pos > index_pos)
156  {
157  ilog("Index is incomplete");
158  construct_index();
159  }
160  }
161  else
162  {
163  ilog("Index is empty");
164  construct_index();
165  }
166  }
167  else if (index_size)
168  {
169  ilog("Index is nonempty, remove and recreate it");
170  my->index_stream.close();
171  fc::remove_all(my->index_file);
172  my->index_stream.open(my->index_file.generic_string().c_str(), LOG_WRITE);
173  my->index_write = true;
174  }
175 }
176 
178 {
179  my.reset(new detail::block_log_impl());
180 }
181 
182 bool block_log::is_open() const
183 {
184  return my->block_stream.is_open();
185 }
186 
187 fc::path block_log::block_log_index_path(const fc::path& file)
188 {
189  return fc::path(file.generic_string() + ".index");
190 }
191 
192 uint64_t block_log::append(const signed_block& b)
193 {
194  try
195  {
196  my->check_block_write();
197  my->check_index_write();
198 
199  uint64_t pos = my->block_stream.tellp();
200  FC_ASSERT((uint64_t)my->index_stream.tellp()
201  == (std::fstream::streampos)sizeof(uint64_t) * ((uint64_t)b.block_num() - 1),
202  "Append to index file occuring at wrong position.",
203  ("position", (uint64_t)my->index_stream.tellp())("expected",
204  ((uint64_t)b.block_num() - 1) * sizeof(uint64_t)));
205  auto data = fc::raw::pack(b);
206  my->block_stream.write(data.data(), data.size());
207  my->block_stream.write((char*)&pos, sizeof(pos));
208  my->index_stream.write((char*)&pos, sizeof(pos));
209  my->head = b;
210  my->head_id = b.id();
211 
212  return pos;
213  }
214  FC_LOG_AND_RETHROW()
215 }
216 
218 {
219  my->block_stream.flush();
220  my->index_stream.flush();
221 }
222 
223 std::pair<signed_block, uint64_t> block_log::read_block(uint64_t pos) const
224 {
225  try
226  {
227  my->check_block_read();
228 
229  my->block_stream.seekg(pos);
230  std::pair<signed_block, uint64_t> result;
231  fc::raw::unpack(my->block_stream, result.first);
232  result.second = uint64_t(my->block_stream.tellg()) + 8;
233  return result;
234  }
235  FC_LOG_AND_RETHROW()
236 }
237 
238 optional<signed_block> block_log::read_block_by_num(uint32_t block_num) const
239 {
240  try
241  {
242  optional<signed_block> b;
243  uint64_t pos = get_block_pos(block_num);
244  if (pos != npos)
245  {
246  b = read_block(pos).first;
247  FC_ASSERT(b->block_num() == block_num, "Wrong block was read from block log.",
248  ("returned", b->block_num())("expected", block_num));
249  }
250  return b;
251  }
252  FC_LOG_AND_RETHROW()
253 }
254 
255 uint64_t block_log::get_block_pos(uint32_t block_num) const
256 {
257  try
258  {
259  my->check_index_read();
260 
261  if (!(my->head.valid() && block_num <= protocol::block_header::num_from_id(my->head_id) && block_num > 0))
262  return npos;
263  my->index_stream.seekg(sizeof(uint64_t) * (block_num - 1));
264  uint64_t pos;
265  my->index_stream.read((char*)&pos, sizeof(pos));
266  return pos;
267  }
268  FC_LOG_AND_RETHROW()
269 }
270 
272 {
273  try
274  {
275  my->check_block_read();
276 
277  uint64_t pos;
278  my->block_stream.seekg(-sizeof(pos), std::ios::end);
279  my->block_stream.read((char*)&pos, sizeof(pos));
280  return read_block(pos).first;
281  }
282  FC_LOG_AND_RETHROW()
283 }
284 
285 const optional<signed_block>& block_log::head() const
286 {
287  return my->head;
288 }
289 
290 void block_log::construct_index()
291 {
292  try
293  {
294  ilog("Reconstructing Block Log Index...");
295  my->index_stream.close();
296  fc::remove_all(my->index_file);
297  my->index_stream.open(my->index_file.generic_string().c_str(), LOG_WRITE);
298  my->index_write = true;
299 
300  uint64_t pos = 0;
301  uint64_t end_pos;
302  my->check_block_read();
303 
304  my->block_stream.seekg(-sizeof(uint64_t), std::ios::end);
305  my->block_stream.read((char*)&end_pos, sizeof(end_pos));
306  signed_block tmp;
307 
308  my->block_stream.seekg(pos);
309 
310  while (pos < end_pos)
311  {
312  fc::raw::unpack(my->block_stream, tmp);
313  my->block_stream.read((char*)&pos, sizeof(pos));
314  my->index_stream.write((char*)&pos, sizeof(pos));
315  }
316  }
317  FC_LOG_AND_RETHROW()
318 }
319 }
320 } // scorum::chain
#define LOG_READ
Definition: block_log.cpp:5
#define LOG_WRITE
Definition: block_log.cpp:6
uint64_t append(const signed_block &b)
Definition: block_log.cpp:192
const optional< signed_block > & head() const
Definition: block_log.cpp:285
static const uint64_t npos
Definition: block_log.hpp:63
optional< signed_block > read_block_by_num(uint32_t block_num) const
Definition: block_log.cpp:238
static fc::path block_log_index_path(const fc::path &block_log_file)
Definition: block_log.cpp:187
void open(const fc::path &file)
Definition: block_log.cpp:94
signed_block read_head() const
Definition: block_log.cpp:271
uint64_t get_block_pos(uint32_t block_num) const
Definition: block_log.cpp:255
std::pair< signed_block, uint64_t > read_block(uint64_t file_pos) const
Definition: block_log.cpp:223
optional< signed_block > head
Definition: block_log.cpp:15
void remove_all(db_index &db_idx, utils::bidir_range< const TObject > items)
Definition: db_accessor.hpp:62
fc::ripemd160 block_id_type
Definition: types.hpp:63
Definition: asset.cpp:15
static uint32_t num_from_id(const block_id_type &id)
Definition: block.cpp:14
block_id_type id() const
Definition: block.cpp:19