#include "mapreduce.hpp"
#include
#include
#include
#include [i]
#include
#include
namespace mapreduce
{
int
StringToInt(std::string str) //将字符串转化成整形
{
std::istringstream sin(str);
int ret;
sin>>ret;
return ret;
}
std::string
IntToString(int i) //将整形转化成字符串
{
std::ostringstream sout;
sout ret;
std::string tmp;
for (size_t i = 0; i &files) //扫描文件
{
DIR *dh = opendir(dirname.c_str());
if (!dh)
return;
struct dirent *pde = NULL;
while ((pde = readdir(dh)) != NULL)
{
std::string fname(pde->d_name);
if (fname == "." || fname == "..")
continue;
files.push_back(fname);
}
closedir(dh);
}
void
MapReduceInput::set_filebase(std::string dirname) //将文件名输入,设置好文件基地址,相当于一个接口
{
if (dirname.empty())
dirname = "/";
if (dirname[dirname.size() - 1] != '/')
dirname += "/";
DBUG_PRINT("setting filebase to: "filebase = dirname;
}
std::string
MapReduceInput::get_filebase() //获取文件基地址值
{
if (this->filebase.empty())
{
return "/";
}
return this->filebase;
}
void
MapReduceOutput::set_filebase(std::string dirname)
{
if (dirname.empty())
dirname = "/";
if (dirname[dirname.size() - 1] != '/')
dirname += "/";
this->filebase = dirname;
}
std::string
MapReduceOutput::get_filebase()
{
if (this->filebase.empty())
{
return "/";
}
return this->filebase;
}
MapReduceInput *
MapReduceSpecification::add_input()
{
MapReduceInput *pin = new MapReduceInput;
this->ilist.push_back(pin);
return pin;
}
MapReduceOutput *
MapReduceSpecification::output()
{
return &(this->mr_out);
}
bool
Mapper::Emit(std::string const &key, std::string const &value)
{
std::string data = key + "\t" + value + "\n";
size_t ewt = fwrite(data.c_str(), data.size(), 1, this->pmi->_pif);
if (ewt != 1)
return false;
return true;
}
void
ReduceInput::read_intermediate()
{
std::string line;
while (mapreduce::get_line(line, this->_pif))
{
std::vector data =
explode('\t', line);
line = "";
this->_intermediate.push_back(make_pair(data[0], data[1]));
}
}
void
ReduceInput::sort_intermediate()
{
std::sort(this->_intermediate.begin(),
this->_intermediate.end(),
intermediate_sorter());
}
ReduceInput::ReduceInput(FILE *_pfile, FILE *_pintermediate)
: _pf(_pfile), _pif(_pintermediate), all_done(false)
{
this->read_intermediate();
this->sort_intermediate();
this->l = this->f = this->_intermediate.begin();
if (!this->_intermediate.empty())
{
this->_key = this->_intermediate[0].first;
}
else
{
this->all_done = true;
}
while (this->l != this->_intermediate.end() && this->l->first == this->_key)
{
++this->l;
}
}
bool
ReduceInput::set_next_range()
{
if (this->l == this->_intermediate.end())
{
this->all_done = true;
return false;
}
this->f = this->l;
this->_key = this->f->first;
while (this->l != this->_intermediate.end() && this->l->first == this->_key)
{
++this->l;
}
return true;
}
bool
Reducer::Emit(std::string const &str)
{
std::string data = this->pri->_key + "\t" + str + "\n";
size_t ewt = fwrite(data.c_str(), data.size(), 1, this->pri->_pf);
if (ewt != 1)
return false;
return true;
}
bool
MapInput::get_line()
{
bool gl_ret = mapreduce::get_line(this->_line, this->_pf);
if (gl_ret)
++this->_lno;
return gl_ret;
}
void *
worker_proc(void *vptr)
{
worker_data *pdata = (worker_data*)(vptr);
/* Open the intermediate file in write mode. */
std::string ifile_name = "/tmp/mapreduce_intermediate." +
IntToString(pdata->thr_no) + ".dat";
FILE *pif = fopen(ifile_name.c_str(), "w");
if (!pif)
{
std::cerrpfiles->begin();
i != pdata->pfiles->end(); ++i)
{
/* Open it and pass the opened file handle to the get_line()
* function till there are lines to be read and also call the
* Mapping function.
*/
std::cerrc_str(), "r");
if (!pf)
{
std::cerrpspec->mapper->pmi = &mi;
while (mi.get_line())
{
DBUG_PRINT("Processing line: "pspec->mapper->Map(mi);
}
fclose(pf);
}
fclose(pif);
return 0;
}
bool
MapReduce(MapReduceSpecification &spec, MapReduceResult &res)
{
std::vector > files;
files.resize(spec.num_thr);
int thr_no = 0;
/* For each directory containing the input file(s). */
for (size_t i = 0; i file_list;
mapreduce::scandir(spec.ilist[i]->get_filebase(), file_list);
/* For each file in this directory. */
for (size_t j = 0; j get_filebase() + file_list[j];
files[thr_no].push_back(fname);
++thr_no;
thr_no %= spec.num_thr;
}
}
std::vector wdata;
wdata.resize(spec.num_thr);
/* For each thread. */
for (size_t i = 0; i Reduce(&ri);
ri.set_next_range();
}
fclose(pf);
fclose(pif);
return true;
}
}
|