00001
00002
00003
00004 #ifndef AITOOLS_INVERTEDINDEX_INDEX_BUILDER_HPP
00005 #define AITOOLS_INVERTEDINDEX_INDEX_BUILDER_HPP
00006
00007 #include "System.hpp"
00008 #include "Record.hpp"
00009 #include "Builder.hpp"
00010 #include "Logging.hpp"
00011 #include "Checksum.hpp"
00012 #include "Converter.hpp"
00013 #include "RecordReader.hpp"
00014 #include "StorageBuilder.hpp"
00015 #include "PostlistSorter.hpp"
00016 #include "PostlistBuilder.hpp"
00017 #include "InvertedFileReader.hpp"
00018 #include <boost/algorithm/string.hpp>
00019 #include <iomanip>
00020 #include <cstdio>
00021
00022 namespace aitools {
00023 namespace invertedindex {
00024
00034 template<typename Value>
00035 class IndexBuilder : public Builder {
00036
00037 private:
00038
00039 typedef std::vector<FILE*> FileVector;
00040 typedef Configuration::Sorting Sorting;
00041 typedef std::tr1::unordered_map<std::string,
00042 PostlistBuilder::SharedPointer> BuilderMap;
00043
00044 private:
00045
00046 static const std::string tmp_dir;
00047 static const std::string tmp_file;
00048
00049 public:
00050
00051 static const std::string header_file;
00052
00053 public:
00054
00058 IndexBuilder(RecordReader<Value>* reader = new InvertedFileReader<Value>);
00059
00063 ~IndexBuilder();
00064
00065 public:
00066
00067 void build(const Configuration& config) throw (std::exception);
00068
00069 private:
00070
00079 static void build_postlists(const bfs::path& path, BuilderMap& builders)
00080 throw (std::exception);
00081
00082 static const Configuration& check_config(const Configuration& config)
00083 throw (std::exception);
00084
00085 static void store_header(const Header& header, const bfs::path& path)
00086 throw (std::exception);
00087
00088 static void print_header(const Header& header);
00089
00090 private:
00091
00092 void index_random_records_() throw (std::exception);
00093
00094 void index_sorted_records_() throw (std::exception);
00095
00096 bfs::path partition_records_() throw (std::exception);
00097
00098 private:
00099
00100 Header header_;
00101 Configuration config_;
00102 RecordReader<Value>* reader_;
00103
00104 };
00105
00106 template<typename Value>
00107 const std::string
00108 IndexBuilder<Value>::tmp_dir("tmp");
00109
00110 template<typename Value>
00111 const std::string
00112 IndexBuilder<Value>::tmp_file("tmp.");
00113
00114 template<typename Value>
00115 const std::string
00116 IndexBuilder<Value>::header_file("header");
00117
00118 template<typename Value>
00119 IndexBuilder<Value>::IndexBuilder(RecordReader<Value>* reader)
00120 : reader_(reader)
00121 {
00122 assert(reader);
00123 }
00124
00125 template<typename Value>
00126 IndexBuilder<Value>::~IndexBuilder()
00127 {
00128 delete reader_;
00129 }
00130
00131
00132
00133 template<typename Value>
00134 void
00135 IndexBuilder<Value>::build_postlists
00136 (const bfs::path& path, BuilderMap& builders) throw (std::exception)
00137 {
00138 GenericRecord record;
00139 BuilderMap::const_iterator it;
00140 BuilderMap::const_iterator end(builders.end());
00141 FILE* bucket(System::fopen(path, "rb"));
00142 while (record.read(bucket))
00143 {
00144 if ((it = builders.find(record.key())) == end)
00145 {
00146 PostlistBuilder::SharedPointer builder;
00147 builder.reset(new PostlistBuilder);
00148 builder->set_checksum(Checksum::hash16(record.key()));
00149 it = builders.insert(std::make_pair(record.key(), builder)).first;
00150 }
00151 it->second->append(record.value());
00152 }
00153 System::fclose(bucket);
00154 }
00155
00156 template<typename Value>
00157 const Configuration&
00158 IndexBuilder<Value>::check_config(const Configuration& config)
00159 throw (std::exception)
00160 {
00161 if (!bfs::exists(config.input_directory()))
00162 {
00163 std::string msg("Does not exist");
00164 Exception::throw_invalid_argument(msg, config.input_directory());
00165 }
00166 if (!bfs::exists(config.index_directory()))
00167 {
00168 std::string msg("Does not exist");
00169 Exception::throw_invalid_argument(msg, config.index_directory());
00170 }
00171 if (bfs::is_empty(config.input_directory()))
00172 {
00173 std::string msg("Is empty");
00174 Exception::throw_invalid_argument(msg, config.input_directory());
00175 }
00176 if (!bfs::is_empty(config.index_directory()))
00177 {
00178 std::string msg("Is not empty");
00179 Exception::throw_invalid_argument(msg, config.index_directory());
00180 }
00181 return config;
00182 }
00183
00184 template<typename Value>
00185 void
00186 IndexBuilder<Value>::store_header(const Header& header, const bfs::path& path)
00187 throw (std::exception)
00188 {
00189 bfs::ofstream ofs(path, std::ios::binary);
00190 if (!ofs)
00191 {
00192 Exception::throw_invalid_argument("Cannot create", path);
00193 }
00194 else if (!header.SerializeToOstream(&ofs))
00195 {
00196 Exception::throw_runtime_error("Cannot serialize header");
00197 }
00198 ofs.close();
00199 }
00200
00201 template<typename Value>
00202 void
00203 IndexBuilder<Value>::print_header(const Header& header)
00204 {
00205 double payload((double)header.payload() / 1024 / 1024);
00206 double runtime((double)(header.indexing_duration()) / 60);
00207 std::clog << "\nNumber of keys = " << header.key_count()
00208 << "\nNumber of values = " << header.value_count()
00209 << std::fixed << std::setprecision(2)
00210 << "\nData payload size = " << payload << " Megabyte"
00211 << "\nIndexing duration = " << runtime << " Minutes\n"
00212 << std::endl;
00213 }
00214
00215
00216
00217 template<typename Value>
00218 void
00219 IndexBuilder<Value>::build(const Configuration& config) throw (std::exception)
00220 {
00221 config_ = check_config(config);
00222 header_.set_value_class_name(Value::classname);
00223 header_.set_major_version(INDEX_VERSION_MAJOR);
00224 header_.set_minor_version(INDEX_VERSION_MINOR);
00225 header_.set_indexing_duration(Logging::time());
00226 header_.set_postlist_sorting(config.postlist_sorting());
00227 if (config.input_format() == Configuration::REAL_INVERTED)
00228 {
00229 index_sorted_records_();
00230 }
00231 else
00232 {
00233 index_random_records_();
00234 }
00235 Logging::log("Indexing succeeded.");
00236 header_.set_indexing_duration(Logging::time()-header_.indexing_duration());
00237 store_header(header_, bfs::path(config.index_directory()) / header_file);
00238 print_header(header_);
00239 }
00240
00241 template<typename Value>
00242 void
00243 IndexBuilder<Value>::index_random_records_() throw (std::exception)
00244 {
00245
00246
00247 const bfs::path bucket_dir(partition_records_());
00248
00249
00250
00251
00252 int64_t key_cnt(0);
00253 int64_t val_cnt(0);
00254 int64_t payload(0);
00255 Quantile quantile;
00256 BuilderMap builders;
00257 StorageBuilder storage;
00258 Postlist<Value> postlist;
00259 bfs::directory_iterator dend;
00260 storage.open(config_.index_directory());
00261 BuilderMap::const_iterator bend(builders.end());
00262 Sorting sort_mode(config_.postlist_sorting());
00263 for (bfs::directory_iterator dit(bucket_dir); dit != dend; ++dit)
00264 {
00265 Logging::log("Gather Step " + dit->path().string());
00266
00267 build_postlists(dit->path(), builders);
00268 for (BuilderMap::iterator bit(builders.begin()); bit != bend; ++bit)
00269 {
00270 postlist = bit->second->build();
00271 PostlistSorter::sort(postlist, quantile, sort_mode);
00272 storage.put(bit->first, postlist.iterator());
00273 if (sort_mode != Configuration::DISABLED)
00274 {
00275 storage.put(bit->first, quantile);
00276 }
00277 key_cnt++;
00278 val_cnt += postlist.length();
00279 payload += postlist.payload();
00280 }
00281 bfs::remove(dit->path());
00282 builders.clear();
00283 }
00284 header_.set_value_count(val_cnt);
00285 header_.set_key_count(key_cnt);
00286 header_.set_payload(payload);
00287 bfs::remove(bucket_dir);
00288 storage.close();
00289 }
00290
00291 template<typename Value>
00292 void
00293 IndexBuilder<Value>::index_sorted_records_() throw (std::exception)
00294 {
00295 int64_t key_cnt(0);
00296 int64_t val_cnt(0);
00297 int64_t payload(0);
00298 std::string key;
00299 Quantile quantile;
00300 Record<Value> record;
00301 GenericRecord grecord;
00302 StorageBuilder storage;
00303 PostlistBuilder builder;
00304 Postlist<Value> postlist;
00305 bfs::directory_iterator end;
00306 storage.open(config_.index_directory());
00307 Sorting sort_mode(config_.postlist_sorting());
00308 for (bfs::directory_iterator it(config_.input_directory()); it != end; ++it)
00309 {
00310 Logging::log("Gather Step " + it->path().string());
00311
00312 reader_->open(it->path());
00313 while (reader_->next(record))
00314 {
00315 record.to_generic(grecord);
00316 if (grecord.key().size() > GenericRecord::max_key_size)
00317 {
00318 Exception::throw_out_of_range("Too long key", record);
00319 }
00320 if (grecord.value().size() > GenericRecord::max_val_size)
00321 {
00322 Exception::throw_out_of_range("Too long value", record);
00323 }
00324 if (key.empty())
00325 {
00326 key = grecord.key();
00327 }
00328 else if (grecord.key() != key)
00329 {
00330 postlist = builder.build();
00331 PostlistSorter::sort(postlist, quantile, sort_mode);
00332 storage.put(key, postlist.iterator());
00333 if (sort_mode != Configuration::DISABLED)
00334 {
00335 storage.put(key, quantile);
00336 }
00337 key_cnt++;
00338 val_cnt += postlist.length();
00339 payload += postlist.payload();
00340 builder.set_checksum(Checksum::hash16(grecord.key()));
00341 key = grecord.key();
00342 }
00343 builder.append(grecord.value());
00344 }
00345 reader_->close();
00346 }
00347 postlist = builder.build();
00348 PostlistSorter::sort(postlist, quantile, sort_mode);
00349 storage.put(key, postlist.iterator());
00350 if (sort_mode != Configuration::DISABLED)
00351 {
00352 storage.put(key, quantile);
00353 }
00354 key_cnt++;
00355 val_cnt += postlist.length();
00356 payload += postlist.payload();
00357 header_.set_value_count(val_cnt);
00358 header_.set_key_count(key_cnt);
00359 header_.set_payload(payload);
00360 storage.close();
00361 }
00362
00363 template<typename Value>
00364 bfs::path
00365 IndexBuilder<Value>::partition_records_() throw (std::exception)
00366 {
00367
00368 const bfs::path bucket_dir(bfs::path(config_.index_directory()) / tmp_dir);
00369 const uint64_t payload(System::directory_size(config_.input_directory()));
00370 if (config_.available_memory() < 1000) config_.set_available_memory(1000);
00371 unsigned bucket_count(payload / (700000 * config_.available_memory()));
00372 bucket_count = Checksum::next_prime(bucket_count);
00373 std::vector<FILE*> bucket_files(bucket_count);
00374 bfs::create_directory(bucket_dir);
00375 std::string bucket_file_name;
00376 for (unsigned i(0); i != bucket_count; ++i)
00377 {
00378 bucket_file_name = tmp_file + Converter::ui32_to_str(i);
00379 bucket_files[i] = System::fopen(bucket_dir / bucket_file_name, "wb");
00380 }
00381
00382
00383 uint32_t checksum;
00384 Record<Value> record;
00385 GenericRecord gen_record;
00386 const bfs::directory_iterator end;
00387 for (bfs::directory_iterator it(config_.input_directory()); it != end; ++it)
00388 {
00389 Logging::log("Scatter Step " + it->path().string());
00390
00391 reader_->open(it->path());
00392 while (reader_->next(record))
00393 {
00394 record.to_generic(gen_record);
00395 if (gen_record.key().size() > GenericRecord::max_key_size)
00396 {
00397 Exception::throw_out_of_range("Too long key", record);
00398 }
00399 if (gen_record.value().size() > GenericRecord::max_val_size)
00400 {
00401 Exception::throw_out_of_range("Too long value", record);
00402 }
00403 checksum = Checksum::hash32(gen_record.key());
00404 gen_record.write(bucket_files[checksum % bucket_count]);
00405 }
00406 reader_->close();
00407 }
00408
00409
00410 for (unsigned i(0); i != bucket_count; ++i)
00411 {
00412 System::fclose(bucket_files[i]);
00413 }
00414 return bucket_dir;
00415 }
00416
00417 }
00418 }
00419
00420 #endif // AITOOLS_INVERTEDINDEX_INDEX_BUILDER_HPP