From 1db82dee51be1bd9667db6871f5f32be4db25bce Mon Sep 17 00:00:00 2001 From: cppenthu Date: Wed, 2 Jul 2008 02:36:39 +0000 Subject: [PATCH] ftfileprovider.cc - takes care of getting data from a requested file ftfilecreator.cc -- creates a file, tracks missing chunks, timed out chunks etc git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@626 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/ft/Makefile | 14 +- libretroshare/src/ft/ftfilecreator.cc | 416 +++++++++++++++++++++ libretroshare/src/ft/ftfilecreator.h | 71 +++- libretroshare/src/ft/ftfilecreatortest.cc | 112 ++++++ libretroshare/src/ft/ftfileprovider.cc | 117 ++++++ libretroshare/src/ft/ftfileprovider.h | 12 +- libretroshare/src/ft/ftfileprovidertest.cc | 32 ++ 7 files changed, 763 insertions(+), 11 deletions(-) create mode 100644 libretroshare/src/ft/ftfilecreator.cc create mode 100644 libretroshare/src/ft/ftfilecreatortest.cc create mode 100644 libretroshare/src/ft/ftfileprovider.cc create mode 100644 libretroshare/src/ft/ftfileprovidertest.cc diff --git a/libretroshare/src/ft/Makefile b/libretroshare/src/ft/Makefile index 8011fc37d..a65c1f3b7 100644 --- a/libretroshare/src/ft/Makefile +++ b/libretroshare/src/ft/Makefile @@ -7,18 +7,22 @@ RS_TOP_DIR = .. include $(RS_TOP_DIR)/scripts/config.mk ############################################################### -RSOBJ = +RSOBJ = ftfileprovider.o ftfilecreator.o -TESTOBJ = #ftcachetest.o +TESTOBJ = ftfileprovidertest.o ftfilecreatortest.o -TESTS = #ftcachetest +TESTS = ftfileprovidertest ftfilecreatortest all: librs tests -#ftcachetest : ftcachetest.o $(OBJ) -# $(CC) $(CFLAGS) -o ftcachetest ftcachetest.o $(OBJ) $(LIBS) +ftfilecreatortest : ftfilecreatortest.o $(RSOBJ) + $(CC) $(CFLAGS) -o ftfilecreatortest ftfilecreatortest.o $(RSOBJ) $(LIBS) + +ftfileprovidertest : ftfileprovidertest.o $(RSOBJ) + $(CC) $(CFLAGS) -o ftfileprovidertest ftfileprovidertest.o $(RSOBJ) $(LIBS) ############################################################### include $(RS_TOP_DIR)/scripts/rules.mk ############################################################### + diff --git a/libretroshare/src/ft/ftfilecreator.cc b/libretroshare/src/ft/ftfilecreator.cc new file mode 100644 index 000000000..e47a1836c --- /dev/null +++ b/libretroshare/src/ft/ftfilecreator.cc @@ -0,0 +1,416 @@ +#include "ftfilecreator.h" + + +ftFileCreator::ftFileCreator(std::string path, uint64_t size, std::string +hash, std::string chunker="default"): ftFileProvider(path,size,hash) { + /* any inits to do?*/ + initialize(chunker); +} + +void ftFileCreator::initialize(std::string chunker) { + RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ + if (chunker == "default") + fileChunker = new ftFileChunker(total_size); + else + fileChunker = new ftFileRandomizeChunker(total_size); + + fileChunker->splitFile(); +} + +int ftFileCreator::initializeFileAttrs() //not override? +{ + RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ + /* check if the file exists */ + + { + std::cout << + "ftFileProvider::initializeFileAttrs() Filename: " << file_name; + } + + /* attempt to open file in writing mode*/ + + fd = fopen(file_name.c_str(), "w+b"); + if (!fd) + { + std::cout << + "ftFileProvider::initializeFileAttrs() Failed to open (w+b): " + << file_name << std::endl; + return 0; + + } + + + /* if it opened, find it's length */ + /* move to the end */ + if (0 != fseek(fd, 0L, SEEK_END)) + { + std::cerr << "ftFileProvider::initializeFileAttrs() Seek Failed" << std::endl; + //s->status = (PQIFILE_FAIL | PQIFILE_FAIL_NOT_SEEK);*/ + return 0; + } + + /*s->recv_size = ftell(fd); /* get the file length */ + //s->total_size = s->size; /* save the total length */ + //s->fd = fd;*/ + recv_size = ftell(fd); /* get the file length */ + /*total_size is unchanged its set at construction*/ + + + return 1; +} + +bool ftFileCreator::addFileData(uint64_t offset, uint32_t chunk_size, void *data) +{ + RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ + /* check the status */ + + if (fd==NULL) { + int init = initializeFileAttrs(); + if (init ==0) { + std::cerr <<"Initialization Failed" << std::endl; + return 0; + } + } + /* check its at the correct location */ + if (offset + chunk_size > this->total_size) + { + chunk_size = total_size - offset; + std::cerr <<"Chunk Size greater than total file size, adjusting chunk " + << "size " << chunk_size << std::endl; + + } + + /* go to the offset of the file */ + if (0 != fseek(this->fd, offset, SEEK_SET)) + { + std::cerr << "ftFileCreator::addFileData() Bad fseek" << std::endl; + return 0; + } + + long int pos; + pos = ftell(fd); + std::cerr << pos << " BEFORE RECV SIZE "<< recv_size << std::endl; + + /* add the data */ + if (1 != fwrite(data, chunk_size, 1, this->fd)) + { + std::cerr << "ftFileCreator::addFileData() Bad fwrite" << std::endl; + return 0; + } + + this->recv_size += chunk_size; + + pos = ftell(fd); + std::cerr << pos << " RECV SIZE "<< recv_size << std::endl; + //Notify ftFileChunker that all are received + fileChunker->notifyReceived(offset,chunk_size); + + /* if we've completed the request this time */ + /*if (s->req_loc + s->req_size == s->recv_size) + { + s->lastDelta = time(NULL) - s->lastTS; + } + + if (s->recv_size == s->total_size) + { + pqioutput(PQL_DEBUG_BASIC, ftfilerzone, + "ftfiler::addFileData() File Complete!"); + s->status = PQIFILE_COMPLETE; + + ///* HANDLE COMPLETION HERE + //completeFileTransfer(s); Notify ftFileChunker that all are received + }*/ + + return 1; +} + + + + + +ftFileCreator::~ftFileCreator(){ +} + + +bool ftFileCreator::getMissingChunk(uint64_t &offset, uint32_t &chunk) { + fileChunker->getMissingChunk(offset, chunk); +} + +/*********************************************************** +* +* FtFileChunker methods +* +***********************************************************/ + +ftFileChunker::ftFileChunker(uint64_t size): file_size(size), std_chunk_size(10000), monitorPeriod(30) { + /* any inits to do?*/ + std::cout << "Constructor ftFileChunker\n"; + srand ( time(NULL) ); + aggregate_status = 0; +} + +ftFileChunker::~ftFileChunker(){ + std::cout << "Destructor of ftFileChunker\n"; + std::vector::iterator it; + for(int i=0; imax_chunk_size << " " << allocationTable.at(j)->chunk_status << std::endl; + } + return 1; +} + + +bool ftFileChunker::getMissingChunk(uint64_t &offset, uint32_t &chunk) { + //This method sets the offset, chunk may be reset if needed + RsStackMutex stack(chunkerMutex); /********** STACK LOCKED MTX ******/ + std::cerr << "Calling getMissingChunk with chunk="<< chunk << std::endl; + int i =0; + bool found = false; + int chunks_after = 0; + int chunks_rem = 0; + + if(aggregate_status == num_chunks * ftChunk::RECEIVED) + return found; + + + while(imax_chunk_size >=chunk){ + offset = allocationTable.at(i)->offset; + chunks_after = chunk/std_chunk_size; //10KB + chunks_rem = chunk % std_chunk_size; + chunk -= chunks_rem; + std::cout << "Found " << chunk << " at " << i << " "<< chunks_after << std::endl; + allocationTable.at(i)->max_chunk_size=0; + allocationTable.at(i)->timestamp = time(NULL); + allocationTable.at(i)->chunk_status = ftChunk::ALLOCATED; + found = true; + break; + } + i++; + } + + if (!found) { + i=0; + uint64_t min = allocationTable.at(i)->max_chunk_size - chunk; + uint64_t diff = min; + int mini = -1; + while(imax_chunk_size-chunk; + if(diff <= min && diff >0){ + min = allocationTable.at(i)->max_chunk_size - chunk; + mini = i; + } + i++; + } + if (min > -1) { + offset = allocationTable.at(mini)->offset; + chunk = allocationTable.at(mini)->max_chunk_size; + chunks_after = chunk/std_chunk_size; //10KB + chunks_rem = chunk % std_chunk_size; + chunk -= chunks_rem; + allocationTable.at(mini)->max_chunk_size=0; + allocationTable.at(mini)->timestamp = time(NULL); + allocationTable.at(mini)->chunk_status = ftChunk::ALLOCATED; + found = true; + } + + } //if not found + + if (found) { + std::cout << "Chunks remaining " << chunks_rem << std::endl; + // update all previous chunks max available size + for(int j=0;jmax_chunk_size >0) + allocationTable.at(j)->max_chunk_size -= chunk; + } + + for(int j=i;jmax_chunk_size = 0; + allocationTable.at(j)->chunk_status = ftChunk::ALLOCATED; + + } + + for(int j=0;jmax_chunk_size << " " << allocationTable.at(j)->chunk_status << std::endl; + } + } + return found; +} + +/* This should run on a separate thread when ftFileChunker is initialized*/ +int ftFileChunker::monitor() { + int reset = 0; + std::cout<<"Running monitor.."<chunk_status == ftChunk::ALLOCATED && allocationTable.at(j)->timestamp - time(NULL) > 30){ + allocationTable.at(j)->chunk_status = ftChunk::AVAIL; + reset++; + } + } + return reset; +} + +void ftFileChunker::setMonitorPeriod(int period) { + monitorPeriod = period; +} + +void ftFileChunker::run(){ + + while(1) + { + + for(int i = 0; i < monitorPeriod; i++) + { + +/********************************** WINDOWS/UNIX SPECIFIC PART ******************/ +#ifndef WINDOWS_SYS + sleep(1); +#else + + Sleep(1000); +#endif +/********************************** WINDOWS/UNIX SPECIFIC PART ******************/ + } + monitor(); + } + +} + + +ftFileRandomizeChunker::ftFileRandomizeChunker(uint64_t size): +ftFileChunker(size) { + +} + +ftFileRandomizeChunker::~ftFileRandomizeChunker(){ + std::cout << "Destructor of ftFileRandomizeChunker\n"; +} + + + +bool ftFileRandomizeChunker::getMissingChunk(uint64_t &offset, uint32_t &chunk) { + //This method sets the offset, chunk may be reset if needed + RsStackMutex stack(chunkerMutex); /********** STACK LOCKED MTX ******/ + std::cerr << "Calling getMissingChunk with chunk="<< chunk << std::endl; + int i =0; + bool found = false; + int chunks_after = 0; + int chunks_rem = 0; + + if(aggregate_status == num_chunks * ftChunk::RECEIVED) + return found; + + std::vector randomIndex; + while(imax_chunk_size >=chunk){ + randomIndex.push_back(i); + } + i++; + } + + /* test to make sure its picking every index*/ + if (randomIndex.size()>0) { + int rnum = rand() % randomIndex.size(); + i = randomIndex.at(rnum); + std::cout << "i=" <offset; + chunks_after = chunk/std_chunk_size; //10KB + chunks_rem = chunk % std_chunk_size; + chunk -= chunks_rem; + std::cout << "Found " << chunk << " at index =" << i << " "<< chunks_after << std::endl; + allocationTable.at(i)->max_chunk_size=0; + allocationTable.at(i)->timestamp = time(NULL); + allocationTable.at(i)->chunk_status = ftChunk::ALLOCATED; + found = true; + } + + if (!found) { + i=0; + uint64_t min = allocationTable.at(i)->max_chunk_size - chunk; + uint64_t diff = min; + int mini = -1; + while(imax_chunk_size-chunk; + if(diff <= min && diff >0){ + min = allocationTable.at(i)->max_chunk_size - chunk; + mini = i; + } + i++; + } + if (min > -1) { + offset = allocationTable.at(mini)->offset; + chunk = allocationTable.at(mini)->max_chunk_size; + chunks_after = chunk/std_chunk_size; //10KB + chunks_rem = chunk % std_chunk_size; + chunk -= chunks_rem; + allocationTable.at(mini)->max_chunk_size=0; + allocationTable.at(mini)->timestamp = time(NULL); + allocationTable.at(mini)->chunk_status = ftChunk::ALLOCATED; + found = true; + } + + } //if not found + + if (found) { + std::cout << "Chunks remaining " << chunks_rem << std::endl; + // update all previous chunks max available size + for(int j=0;jmax_chunk_size >0) + allocationTable.at(j)->max_chunk_size -= chunk; + } + + for(int j=i;jmax_chunk_size = 0; + allocationTable.at(j)->chunk_status = ftChunk::ALLOCATED; + + } + +/* for(int j=0;jmax_chunk_size << " " << allocationTable.at(j)->chunk_status << std::endl; + }*/ + } + return found; +} + + + +int ftFileChunker::notifyReceived(uint64_t offset, uint32_t chunk_size) { + RsStackMutex stack(chunkerMutex); /********** STACK LOCKED MTX ******/ + int index = offset / std_chunk_size; + std::cout << "INDEX " << index << std::endl; + if(allocationTable.at(index)->chunk_status == ftChunk::ALLOCATED){ + allocationTable.at(index)->chunk_status = ftChunk::RECEIVED; + aggregate_status += ftChunk::RECEIVED; + } +} + +ftChunk::ftChunk(uint64_t offset,uint64_t chunk_size,time_t time, ftChunk::Status s) : offset(offset), max_chunk_size(chunk_size), timestamp(time), chunk_status(s){ + +} diff --git a/libretroshare/src/ft/ftfilecreator.h b/libretroshare/src/ft/ftfilecreator.h index 8547d142f..eb8a944f2 100644 --- a/libretroshare/src/ft/ftfilecreator.h +++ b/libretroshare/src/ft/ftfilecreator.h @@ -32,25 +32,90 @@ * TODO: Serialiser Load / Save. * */ +#include "ftfileprovider.h" +#include "util/rsthreads.h" +#include +class ftChunk; +class ftFileChunker; class ftFileCreator: public ftFileProvider { public: - ftFileCreator(std::string savepath, uint64_t size, std::string hash); + ftFileCreator(std::string savepath, uint64_t size, std::string hash,std::string chunker); + ~ftFileCreator(); /* overloaded from FileProvider */ -virtual bool getFileData(uint64_t offset, uint32_t chunk_size, void *data); +//virtual bool getFileData(uint64_t offset, uint32_t chunk_size, void *data); +int initializeFileAttrs(); //not override? /* creation functions for FileCreator */ bool getMissingChunk(uint64_t &offset, uint32_t &chunk); bool addFileData(uint64_t offset, uint32_t chunk_size, void *data); private: + /* structure to track missing chunks */ + + /* structure to hold*/ - /* TO BE DECIDED */ +// std::string save_path; use file_name from parent +// uint64_t total_size; + uint64_t recv_size; + std::string hash; + ftFileChunker *fileChunker; + void initialize(std::string); + RsMutex ftcMutex; +}; + +/* + This class can either be specialized to follow a different splitting + policy or have an argument to indicate different policies +*/ +class ftFileChunker : public RsThread { +public: + /* Does this require hash?? */ + ftFileChunker(uint64_t size); + virtual ~ftFileChunker(); + /* Breaks up the file into evenly sized chunks + Initializes all chunks to never_requested + */ + int splitFile(); + virtual void run(); + virtual bool getMissingChunk(uint64_t &offset, uint32_t &chunk); + bool getMissingChunkRandom(uint64_t &offset, uint32_t &chunk); + int notifyReceived(uint64_t offset, uint32_t chunk_size); + int monitor(); + void setMonitorPeriod(int period); +protected: + uint64_t file_size; + uint64_t num_chunks; + uint64_t std_chunk_size; + std::vector allocationTable; + unsigned int aggregate_status; + int monitorPeriod; + RsMutex chunkerMutex; /********** STACK LOCKED MTX ******/ }; +class ftFileRandomizeChunker : public ftFileChunker { +public: + ftFileRandomizeChunker(uint64_t size); + virtual bool getMissingChunk(uint64_t &offset, uint32_t &chunk); + ~ftFileRandomizeChunker(); + +}; + + +class ftChunk { +public: + enum Status {AVAIL, ALLOCATED, RECEIVED}; + ftChunk(uint64_t,uint64_t,time_t,Status); + uint64_t offset; + uint64_t max_chunk_size; + time_t timestamp; + Status chunk_status; + +}; + #endif // FT_FILE_PROVIDER_HEADER diff --git a/libretroshare/src/ft/ftfilecreatortest.cc b/libretroshare/src/ft/ftfilecreatortest.cc new file mode 100644 index 000000000..867a35466 --- /dev/null +++ b/libretroshare/src/ft/ftfilecreatortest.cc @@ -0,0 +1,112 @@ +#include "ftfilecreator.h" + +main(){ + /*Testing default file chunker*/ + uint32_t total_size = 1000000; /*100KB*/ + ftFileChunker fc(total_size); + fc.setMonitorPeriod(10); + fc.splitFile(); + std::cout << "Starting fc monitor thread\n"; + fc.start(); + + + /*Simulate calls from transfer module*/ + uint64_t offset; + uint32_t csize = 40000; /* 40KB*/ + if (fc.getMissingChunk(offset, csize)){ + std::cout << "Missing Chunk's offset=" << offset << " chunk_size=" << csize << std::endl; + } + sleep(5); + csize = 10000; + if (fc.getMissingChunk(offset, csize)){ + std::cout << "Missing Chunk's offset=" << offset << " chunk_size=" << csize << std::endl; + } + + csize = 15000; /* Ask more than the multiple of std_chunk_size*/ + if (fc.getMissingChunk(offset, csize)){ + std::cout << "Missing Chunk's offset=" << offset << " chunk_size=" << csize << std::endl; + } + + sleep(10); + /*Test file creator*/ + ftFileCreator fcreator("somefile",100000,"hash","default"); + csize = 40000; + if (fcreator.getMissingChunk(offset, csize)){ + std::cout << "Missing Chunk's offset=" << offset << " chunk_size=" << csize << std::endl; + } + + csize = 10000; + if (fcreator.getMissingChunk(offset, csize)){ + std::cout << "Missing Chunk's offset=" << offset << " chunk_size=" << csize << std::endl; + } + + csize = 15000; + if (fcreator.getMissingChunk(offset, csize)){ + std::cout << "Missing Chunk's offset=" << offset << " chunk_size=" << csize << std::endl; + } + + /* Test file creator adding data to file out-of-order*/ + char* alpha = "abcdefghij"; + std::cerr << "Call to addFileData =" << fcreator.addFileData(10,10,alpha ); + char* numerical = "1234567890"; + std::cerr << "Call to addFileData =" << fcreator.addFileData(0,10,numerical ); + + + /* Test file creator can write out of order/randomized chunker */ + ftFileCreator franc("somefile1",50000,"hash", "randomize"); + csize = 30000; + if (franc.getMissingChunk(offset, csize)){ + std::cout << "Offset " << offset << " Csize " << csize << std::endl; + } + + char* allA = (char*) malloc(csize); + for(int i=0;i total_size) + { + data_size = total_size - base_loc; + std::cerr <<"Chunk Size greater than total file size, adjusting chunk size " << data_size << std::endl; + } + + if (data_size > 0) + { + + /* seek for base_loc */ + fseek(fd, base_loc, SEEK_SET); + + void *data = malloc(chunk_size); + /* read the data */ + if (1 != fread(data, data_size, 1, fd)) + { + std::cerr << "ftFileProvider::getFileData() Failed to get data!"; + free(data); + return 0; + } + + + /* Update status of ftFileStatus to reflect last usage (for GUI display) + * We need to store. + * (a) Id, + * (b) Offset, + * (c) Size, + * (d) timestamp + */ + + time_t now = time(NULL); + //s->id = id; + req_loc = offset; + req_size = data_size; + lastTS = now; + } + else { + std::cerr << "No data to read" << std::endl; + return 0; + } + return 1; +} + + + + +int ftFileProvider::initializeFileAttrs() +{ + /* check if the file exists */ + + { + std::cout << + "ftFileProvider::initializeFileAttrs() Filename: " << file_name; + } + + /* attempt to open file */ + + fd = fopen(file_name.c_str(), "r+b"); + if (!fd) + { + std::cout << + "ftFileProvider::initializeFileAttrs() Failed to open (r+b): " + << file_name << std::endl; + return 0; + + } + + + /* if it opened, find it's length */ + /* move to the end */ + if (0 != fseek(fd, 0L, SEEK_END)) + { + std::cerr << "ftFileProvider::initializeFileAttrs() Seek Failed" << std::endl; + //s->status = (PQIFILE_FAIL | PQIFILE_FAIL_NOT_SEEK);*/ + return 0; + } + + /*s->recv_size = ftell(fd); /* get the file length */ + //s->total_size = s->size; /* save the total length */ + //s->fd = fd;*/ + total_size = ftell(fd); + std::cout <<"exit init\n"; + return 1; +} diff --git a/libretroshare/src/ft/ftfileprovider.h b/libretroshare/src/ft/ftfileprovider.h index 1b78e5c5a..5c6cdf124 100644 --- a/libretroshare/src/ft/ftfileprovider.h +++ b/libretroshare/src/ft/ftfileprovider.h @@ -30,13 +30,15 @@ * ftFileProvider. * */ - +#include +#include +#include "util/rsthreads.h" class ftFileProvider { public: ftFileProvider(std::string path, uint64_t size, std::string hash); - ~ftFileProvider(); + virtual ~ftFileProvider(); /* array already allocated - * just move chunk_size bytes to void *data buffer. @@ -49,7 +51,7 @@ uint64_t getFileSize(); protected: - + virtual int initializeFileAttrs(); std::string file_name; std::string hash; uint64_t total_size; @@ -61,6 +63,10 @@ protected: uint64_t req_loc; uint32_t req_size; time_t lastTS; +/* Mutex Required for stuff below */ + + RsMutex ftPMutex; + }; diff --git a/libretroshare/src/ft/ftfileprovidertest.cc b/libretroshare/src/ft/ftfileprovidertest.cc new file mode 100644 index 000000000..c53f5753e --- /dev/null +++ b/libretroshare/src/ft/ftfileprovidertest.cc @@ -0,0 +1,32 @@ +#include "ftfileprovider.h" + +main(){ + ftFileProvider fp("dummy.txt",1,"ABCDEF"); + char data[2]; + long offset = 0; + for (int i=0;i<10;i++) { + + if (fp.getFileData(offset,2,&data)){ + std::cout <<"Recv data " << data[0] << std::endl; + } + else { + std::cout <<"Recv no data." << std::endl; + } + offset+=2; + } + + + ftFileProvider fp1("dummy1.txt",3,"ABCDEF"); + char data1[3]; + offset = 0; + for (int i=0;i<10;i++) { + + if (fp1.getFileData(offset,2,&data1)){ + std::cout <<"Recv data " << data1[0] << std::endl; + } + else { + std::cout <<"Revc no data" << std::endl; + } + offset+=2; + } +}