From f30ed24a4ae1bb5ee7b3c6c84ea99add2908468c Mon Sep 17 00:00:00 2001 From: thunder2 Date: Sun, 16 Aug 2015 13:27:04 +0200 Subject: [PATCH] Moved gxs data from files into database - Added update to RsDataService - Added new table "DATABASE_RELEASE" to database Advantages: - Better performance because of the removed additional file access - Transaction safety - All groups and messages are stored in the database Attention: Please make a backup of your data folder before you try this version. The first start of the application will update the database and can take a little bit longer. Once the database was converted the messages cannot be read by older versions of the application. --- libretroshare/src/gxs/rsdataservice.cc | 688 ++++++++++++------------- libretroshare/src/gxs/rsdataservice.h | 35 +- 2 files changed, 344 insertions(+), 379 deletions(-) diff --git a/libretroshare/src/gxs/rsdataservice.cc b/libretroshare/src/gxs/rsdataservice.cc index 20d2443bc..8a238b199 100644 --- a/libretroshare/src/gxs/rsdataservice.cc +++ b/libretroshare/src/gxs/rsdataservice.cc @@ -38,18 +38,19 @@ #endif #include "rsdataservice.h" +#include "util/rsstring.h" #define MSG_TABLE_NAME std::string("MESSAGES") #define GRP_TABLE_NAME std::string("GROUPS") +#define DATABASE_RELEASE_TABLE_NAME std::string("DATABASE_RELEASE") #define GRP_LAST_POST_UPDATE_TRIGGER std::string("LAST_POST_UPDATE") #define MSG_INDEX_GRPID std::string("INDEX_MESSAGES_GRPID") // generic -#define KEY_NXS_FILE std::string("nxsFile") -#define KEY_NXS_FILE_OFFSET std::string("fileOffset") -#define KEY_NXS_FILE_LEN std::string("nxsFileLen") +#define KEY_NXS_DATA std::string("nxsData") +#define KEY_NXS_DATA_LEN std::string("nxsDataLen") #define KEY_NXS_IDENTITY std::string("identity") #define KEY_GRP_ID std::string("grpId") #define KEY_ORIG_GRP_ID std::string("origGrpId") @@ -62,6 +63,10 @@ #define KEY_NXS_HASH std::string("hash") #define KEY_RECV_TS std::string("recv_time_stamp") +// remove later +#define KEY_NXS_FILE_OLD std::string("nxsFile") +#define KEY_NXS_FILE_OFFSET_OLD std::string("fileOffset") +#define KEY_NXS_FILE_LEN_OLD std::string("nxsFileLen") // grp table columns #define KEY_KEY_SET std::string("keySet") @@ -93,6 +98,10 @@ #define KEY_MSG_STATUS std::string("msgStatus") #define KEY_CHILD_TS std::string("childTs") +// database release columns +#define KEY_DATABASE_RELEASE_ID std::string("id") +#define KEY_DATABASE_RELEASE_ID_VALUE 1 +#define KEY_DATABASE_RELEASE std::string("release") @@ -100,11 +109,10 @@ // generic #define COL_ACT_GROUP_ID 0 -#define COL_NXS_FILE 1 -#define COL_NXS_FILE_OFFSET 2 -#define COL_NXS_FILE_LEN 3 -#define COL_META_DATA 4 -#define COL_ACT_MSG_ID 5 +#define COL_NXS_DATA 1 +#define COL_NXS_DATA_LEN 2 +#define COL_META_DATA 3 +#define COL_ACT_MSG_ID 4 /*** meta column numbers ***/ @@ -128,6 +136,7 @@ #define COL_PARENT_GRP_ID 21 #define COL_GRP_RECV_TS 22 #define COL_GRP_REP_CUTOFF 23 +#define COL_GRP_DATA_LEN 24 // msg col numbers @@ -140,6 +149,7 @@ #define COL_MSG_NAME 12 #define COL_MSG_SERV_STRING 13 #define COL_MSG_RECV_TS 14 +#define COL_MSG_DATA_LEN 15 // generic meta shared col numbers #define COL_GRP_ID 0 @@ -161,10 +171,13 @@ const uint32_t RsGeneralDataService::GXS_MAX_ITEM_SIZE = 1572864; // 1.5 Mbytes RsDataService::RsDataService(const std::string &serviceDir, const std::string &dbName, uint16_t serviceType, RsGxsSearchModule * /* mod */, const std::string& key) - : RsGeneralDataService(), mDbMutex("RsDataService"), mServiceDir(serviceDir), mDbName(dbName), mDbPath(mServiceDir + "/" + dbName), mServType(serviceType), - mDb( new RetroDb(mDbPath, RetroDb::OPEN_READWRITE_CREATE, key)) { + : RsGeneralDataService(), mDbMutex("RsDataService"), mServiceDir(serviceDir), mDbName(dbName), mDbPath(mServiceDir + "/" + dbName), mServType(serviceType), mDb(NULL) +{ + bool isNewDatabase = !RsDirUtil::fileExists(mDbPath); - initialise(); + mDb = new RetroDb(mDbPath, RetroDb::OPEN_READWRITE_CREATE, key); + + initialise(isNewDatabase); // for retrieving msg meta msgMetaColumns.push_back(KEY_GRP_ID); msgMetaColumns.push_back(KEY_TIME_STAMP); msgMetaColumns.push_back(KEY_NXS_FLAGS); @@ -172,10 +185,11 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d msgMetaColumns.push_back(KEY_MSG_ID); msgMetaColumns.push_back(KEY_ORIG_MSG_ID); msgMetaColumns.push_back(KEY_MSG_STATUS); msgMetaColumns.push_back(KEY_CHILD_TS); msgMetaColumns.push_back(KEY_MSG_PARENT_ID); msgMetaColumns.push_back(KEY_MSG_THREAD_ID); msgMetaColumns.push_back(KEY_MSG_NAME); msgMetaColumns.push_back(KEY_NXS_SERV_STRING); msgMetaColumns.push_back(KEY_RECV_TS); + msgMetaColumns.push_back(KEY_NXS_DATA_LEN); // for retrieving actual data - msgColumns.push_back(KEY_GRP_ID); msgColumns.push_back(KEY_NXS_FILE); msgColumns.push_back(KEY_NXS_FILE_OFFSET); - msgColumns.push_back(KEY_NXS_FILE_LEN); msgColumns.push_back(KEY_NXS_META); msgColumns.push_back(KEY_MSG_ID); + msgColumns.push_back(KEY_GRP_ID); msgColumns.push_back(KEY_NXS_DATA); msgColumns.push_back(KEY_NXS_DATA_LEN); + msgColumns.push_back(KEY_NXS_META); msgColumns.push_back(KEY_MSG_ID); // for retrieving grp meta data grpMetaColumns.push_back(KEY_GRP_ID); grpMetaColumns.push_back(KEY_TIME_STAMP); grpMetaColumns.push_back(KEY_NXS_FLAGS); @@ -186,16 +200,11 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d grpMetaColumns.push_back(KEY_GRP_SIGN_FLAGS); grpMetaColumns.push_back(KEY_GRP_CIRCLE_ID); grpMetaColumns.push_back(KEY_GRP_CIRCLE_TYPE); grpMetaColumns.push_back(KEY_GRP_INTERNAL_CIRCLE); grpMetaColumns.push_back(KEY_GRP_ORIGINATOR); grpMetaColumns.push_back(KEY_GRP_AUTHEN_FLAGS); grpMetaColumns.push_back(KEY_PARENT_GRP_ID); grpMetaColumns.push_back(KEY_RECV_TS); - grpMetaColumns.push_back(KEY_GRP_REP_CUTOFF); - + grpMetaColumns.push_back(KEY_GRP_REP_CUTOFF); grpMetaColumns.push_back(KEY_NXS_DATA_LEN); // for retrieving actual grp data - grpColumns.push_back(KEY_GRP_ID); grpColumns.push_back(KEY_NXS_FILE); grpColumns.push_back(KEY_NXS_FILE_OFFSET); - grpColumns.push_back(KEY_NXS_FILE_LEN); grpColumns.push_back(KEY_NXS_META); - - // for retrieving msg offsets - mMsgOffSetColumns.push_back(KEY_MSG_ID); mMsgOffSetColumns.push_back(KEY_NXS_FILE_OFFSET); - mMsgOffSetColumns.push_back(KEY_NXS_FILE_LEN); + grpColumns.push_back(KEY_GRP_ID); grpColumns.push_back(KEY_NXS_DATA); grpColumns.push_back(KEY_NXS_DATA_LEN); + grpColumns.push_back(KEY_NXS_META); grpIdColumn.push_back(KEY_GRP_ID); @@ -213,74 +222,276 @@ RsDataService::~RsDataService(){ delete mDb; } -void RsDataService::initialise(){ +static bool moveDataFromFileToDatabase(RetroDb *db, const std::string serviceDir, const std::string &tableName, const std::string &keyId, std::list &files) +{ + bool ok = true; + + // Move message data + std::list columns; + columns.push_back(keyId); + columns.push_back(KEY_NXS_FILE_OLD); + columns.push_back(KEY_NXS_FILE_OFFSET_OLD); + columns.push_back(KEY_NXS_FILE_LEN_OLD); + + RetroCursor* c = db->sqlQuery(tableName, columns, "", ""); + + if (c) + { + bool valid = c->moveToFirst(); + + while (ok && valid){ + std::string dataFile; + c->getString(1, dataFile); + + if (!dataFile.empty()) { + bool fileOk = true; + + // first try to find the file in the service dir + if (RsDirUtil::fileExists(serviceDir + "/" + dataFile)) { + dataFile.insert(0, serviceDir + "/"); + } else if (RsDirUtil::fileExists(dataFile)) { + // use old way for backward compatibility + //TODO: can be removed later + } else { + fileOk = false; + + std::cerr << "moveDataFromFileToDatabase() cannot find file " << dataFile; + std::cerr << std::endl; + } + + if (fileOk) { + std::string id; + c->getString(0, id); + + uint32_t offset = c->getInt32(2); + uint32_t data_len = c->getInt32(3); + + char* data = new char[data_len]; + std::ifstream istrm(dataFile.c_str(), std::ios::binary); + istrm.seekg(offset, std::ios::beg); + istrm.read(data, data_len); + istrm.close(); + + ContentValue cv; + // insert new columns + cv.put(KEY_NXS_DATA, data_len, data); + cv.put(KEY_NXS_DATA_LEN, (int32_t) data_len); + // clear old columns + cv.put(KEY_NXS_FILE_OLD, ""); + cv.put(KEY_NXS_FILE_OFFSET_OLD, 0); + cv.put(KEY_NXS_FILE_LEN_OLD, 0); + + ok = db->sqlUpdate(tableName, keyId + "='" + id + "'", cv); + delete[] data; + + if (std::find(files.begin(), files.end(), dataFile) == files.end()) { + files.push_back(dataFile); + } + } + } + + valid = c->moveToNext(); + } + + delete c; + } + + return ok; +} + +void RsDataService::initialise(bool isNewDatabase) +{ + const int databaseRelease = 1; + int currentDatabaseRelease = 0; + bool ok = true; RsStackMutex stack(mDbMutex); // initialise database + if (isNewDatabase || !mDb->tableExists(DATABASE_RELEASE_TABLE_NAME)) { + // create table for database release + mDb->execSQL("CREATE TABLE " + DATABASE_RELEASE_TABLE_NAME + "(" + + KEY_DATABASE_RELEASE_ID + " INT PRIMARY KEY," + + KEY_DATABASE_RELEASE + " INT);"); + } - // create table for msg data - mDb->execSQL("CREATE TABLE IF NOT EXISTS " + MSG_TABLE_NAME + "(" + - KEY_MSG_ID + " TEXT PRIMARY KEY," + - KEY_GRP_ID + " TEXT," + - KEY_NXS_FLAGS + " INT," + - KEY_ORIG_MSG_ID + " TEXT," + - KEY_TIME_STAMP + " INT," + - KEY_NXS_IDENTITY + " TEXT," + - KEY_SIGN_SET + " BLOB," + - KEY_NXS_FILE + " TEXT,"+ - KEY_NXS_FILE_OFFSET + " INT," + - KEY_MSG_STATUS + " INT," + - KEY_CHILD_TS + " INT," + - KEY_NXS_META + " BLOB," + - KEY_MSG_THREAD_ID + " TEXT," + - KEY_MSG_PARENT_ID + " TEXT,"+ - KEY_MSG_NAME + " TEXT," + - KEY_NXS_SERV_STRING + " TEXT," + - KEY_NXS_HASH + " TEXT," + - KEY_RECV_TS + " INT," + - KEY_NXS_FILE_LEN + " INT);"); + if (isNewDatabase) { + // create table for msg data + mDb->execSQL("CREATE TABLE " + MSG_TABLE_NAME + "(" + + KEY_MSG_ID + " TEXT PRIMARY KEY," + + KEY_GRP_ID + " TEXT," + + KEY_NXS_FLAGS + " INT," + + KEY_ORIG_MSG_ID + " TEXT," + + KEY_TIME_STAMP + " INT," + + KEY_NXS_IDENTITY + " TEXT," + + KEY_SIGN_SET + " BLOB," + + KEY_NXS_DATA + " BLOB,"+ + KEY_NXS_DATA_LEN + " INT," + + KEY_MSG_STATUS + " INT," + + KEY_CHILD_TS + " INT," + + KEY_NXS_META + " BLOB," + + KEY_MSG_THREAD_ID + " TEXT," + + KEY_MSG_PARENT_ID + " TEXT,"+ + KEY_MSG_NAME + " TEXT," + + KEY_NXS_SERV_STRING + " TEXT," + + KEY_NXS_HASH + " TEXT," + + KEY_RECV_TS + " INT);"); - // create table for grp data - mDb->execSQL("CREATE TABLE IF NOT EXISTS " + GRP_TABLE_NAME + "(" + - KEY_GRP_ID + " TEXT PRIMARY KEY," + - KEY_TIME_STAMP + " INT," + - KEY_NXS_FILE + " TEXT," + - KEY_NXS_FILE_OFFSET + " INT," + - KEY_KEY_SET + " BLOB," + - KEY_NXS_FILE_LEN + " INT," + - KEY_NXS_META + " BLOB," + - KEY_GRP_NAME + " TEXT," + - KEY_GRP_LAST_POST + " INT," + - KEY_GRP_POP + " INT," + - KEY_MSG_COUNT + " INT," + - KEY_GRP_SUBCR_FLAG + " INT," + - KEY_GRP_STATUS + " INT," + - KEY_NXS_IDENTITY + " TEXT," + - KEY_ORIG_GRP_ID + " TEXT," + - KEY_NXS_SERV_STRING + " TEXT," + - KEY_NXS_FLAGS + " INT," + - KEY_GRP_AUTHEN_FLAGS + " INT," + - KEY_GRP_SIGN_FLAGS + " INT," + - KEY_GRP_CIRCLE_ID + " TEXT," + - KEY_GRP_CIRCLE_TYPE + " INT," + - KEY_GRP_INTERNAL_CIRCLE + " TEXT," + - KEY_GRP_ORIGINATOR + " TEXT," + - KEY_NXS_HASH + " TEXT," + - KEY_RECV_TS + " INT," + - KEY_PARENT_GRP_ID + " TEXT," + - KEY_GRP_REP_CUTOFF + " INT," + - KEY_SIGN_SET + " BLOB);"); + // create table for grp data + mDb->execSQL("CREATE TABLE " + GRP_TABLE_NAME + "(" + + KEY_GRP_ID + " TEXT PRIMARY KEY," + + KEY_TIME_STAMP + " INT," + + KEY_NXS_DATA + " BLOB," + + KEY_NXS_DATA_LEN + " INT," + + KEY_KEY_SET + " BLOB," + + KEY_NXS_META + " BLOB," + + KEY_GRP_NAME + " TEXT," + + KEY_GRP_LAST_POST + " INT," + + KEY_GRP_POP + " INT," + + KEY_MSG_COUNT + " INT," + + KEY_GRP_SUBCR_FLAG + " INT," + + KEY_GRP_STATUS + " INT," + + KEY_NXS_IDENTITY + " TEXT," + + KEY_ORIG_GRP_ID + " TEXT," + + KEY_NXS_SERV_STRING + " TEXT," + + KEY_NXS_FLAGS + " INT," + + KEY_GRP_AUTHEN_FLAGS + " INT," + + KEY_GRP_SIGN_FLAGS + " INT," + + KEY_GRP_CIRCLE_ID + " TEXT," + + KEY_GRP_CIRCLE_TYPE + " INT," + + KEY_GRP_INTERNAL_CIRCLE + " TEXT," + + KEY_GRP_ORIGINATOR + " TEXT," + + KEY_NXS_HASH + " TEXT," + + KEY_RECV_TS + " INT," + + KEY_PARENT_GRP_ID + " TEXT," + + KEY_GRP_REP_CUTOFF + " INT," + + KEY_SIGN_SET + " BLOB);"); - mDb->execSQL("CREATE TRIGGER IF NOT EXISTS " + GRP_LAST_POST_UPDATE_TRIGGER + - " INSERT ON " + MSG_TABLE_NAME + - std::string(" BEGIN ") + - " UPDATE " + GRP_TABLE_NAME + " SET " + KEY_GRP_LAST_POST + "= new." - + KEY_RECV_TS + " WHERE " + KEY_GRP_ID + "=new." + KEY_GRP_ID + ";" - + std::string("END;")); + mDb->execSQL("CREATE TRIGGER " + GRP_LAST_POST_UPDATE_TRIGGER + + " INSERT ON " + MSG_TABLE_NAME + + std::string(" BEGIN ") + + " UPDATE " + GRP_TABLE_NAME + " SET " + KEY_GRP_LAST_POST + "= new." + + KEY_RECV_TS + " WHERE " + KEY_GRP_ID + "=new." + KEY_GRP_ID + ";" + + std::string("END;")); - mDb->execSQL("CREATE INDEX IF NOT EXISTS " + MSG_INDEX_GRPID + " ON " + MSG_TABLE_NAME + "(" + KEY_GRP_ID + ");"); + mDb->execSQL("CREATE INDEX " + MSG_INDEX_GRPID + " ON " + MSG_TABLE_NAME + "(" + KEY_GRP_ID + ");"); + + // Insert release, no need to upgrade + ContentValue cv; + cv.put(KEY_DATABASE_RELEASE_ID, KEY_DATABASE_RELEASE_ID_VALUE); + cv.put(KEY_DATABASE_RELEASE, databaseRelease); + mDb->sqlInsert(DATABASE_RELEASE_TABLE_NAME, "", cv); + + currentDatabaseRelease = databaseRelease; + } else { + // check release + + { + // try to select the release + std::list columns; + columns.push_back(KEY_DATABASE_RELEASE); + + std::string where; + rs_sprintf(where, "%s=%d", KEY_DATABASE_RELEASE_ID.c_str(), KEY_DATABASE_RELEASE_ID_VALUE); + + RetroCursor* c = mDb->sqlQuery(DATABASE_RELEASE_TABLE_NAME, columns, where, ""); + if (c) { + ok = c->moveToFirst(); + + if (ok) { + currentDatabaseRelease = c->getInt32(0); + } + delete c; + + if (!ok) { + // No record found ... insert the record + ContentValue cv; + cv.put(KEY_DATABASE_RELEASE_ID, KEY_DATABASE_RELEASE_ID_VALUE); + cv.put(KEY_DATABASE_RELEASE, currentDatabaseRelease); + ok = mDb->sqlInsert(DATABASE_RELEASE_TABLE_NAME, "", cv); + } + } else { + ok = false; + } + } + + // Release 1 + int newRelease = 1; + if (ok && currentDatabaseRelease < newRelease) { + // Update database + std::list files; + + ok = startReleaseUpdate(newRelease); + + // Move data in files into database + ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " ADD COLUMN " + KEY_NXS_DATA + " BLOB;"); + ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " ADD COLUMN " + KEY_NXS_DATA_LEN + " INT;"); + ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " ADD COLUMN " + KEY_NXS_DATA + " BLOB;"); + ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " ADD COLUMN " + KEY_NXS_DATA_LEN + " INT;"); + + ok = ok && moveDataFromFileToDatabase(mDb, mServiceDir, GRP_TABLE_NAME, KEY_GRP_ID, files); + ok = ok && moveDataFromFileToDatabase(mDb, mServiceDir, MSG_TABLE_NAME, KEY_MSG_ID, files); + +// SQLite doesn't support DROP COLUMN +// ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_OLD + ";"); +// ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_OFFSET_OLD + ";"); +// ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_LEN_OLD + ";"); +// ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_OLD + ";"); +// ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_OFFSET_OLD + ";"); +// ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_LEN_OLD + ";"); + + ok = finishReleaseUpdate(newRelease, ok); + if (ok) { + // Remove transfered files + std::list::const_iterator file; + for (file = files.begin(); file != files.end(); ++file) { + remove(file->c_str()); + } + currentDatabaseRelease = newRelease; + } + } + } + + if (ok) { + std::cerr << "Database " << mDbName << " release " << currentDatabaseRelease << " successfully initialised." << std::endl; + } else { + std::cerr << "Database " << mDbName << " initialisation failed." << std::endl; + } +} + +bool RsDataService::startReleaseUpdate(int release) +{ + // Update database + std::cerr << "Database " << mDbName << " update to release " << release << "." << std::endl; + + return mDb->beginTransaction(); +} + +bool RsDataService::finishReleaseUpdate(int release, bool result) +{ + if (result) { + std::string where; + rs_sprintf(where, "%s=%d", KEY_DATABASE_RELEASE_ID.c_str(), KEY_DATABASE_RELEASE_ID_VALUE); + + ContentValue cv; + cv.put(KEY_DATABASE_RELEASE, release); + result = mDb->sqlUpdate(DATABASE_RELEASE_TABLE_NAME, where, cv); + } + + if (result) { + result = mDb->commitTransaction(); + } else { + result = mDb->rollbackTransaction(); + } + + if (result) { + std::cerr << "Database " << mDbName << " successfully updated to release " << release << "." << std::endl; + } else { + std::cerr << "Database " << mDbName << " update to release " << release << "failed." << std::endl; + } + + return result; } RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c) @@ -318,7 +529,7 @@ RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c) grpMeta->mPublishTs = c.getInt32(COL_TIME_STAMP); grpMeta->mGroupFlags = c.getInt32(COL_NXS_FLAGS); - grpMeta->mGrpSize = c.getInt32(COL_NXS_FILE_LEN); + grpMeta->mGrpSize = c.getInt32(COL_GRP_DATA_LEN); offset = 0; @@ -387,41 +598,12 @@ RsNxsGrp* RsDataService::locked_getGroup(RetroCursor &c) grp->meta.GetTlv(data, data_len, &offset); } - /* now retrieve grp data from file */ - std::string grpFile; - c.getString(COL_NXS_FILE, grpFile); - ok &= !grpFile.empty(); - + /* now retrieve grp data */ + offset = 0; data_len = 0; if(ok){ - - data_len = c.getInt32(COL_NXS_FILE_LEN); - offset = c.getInt32(COL_NXS_FILE_OFFSET); - - // first try to find the file in the service dir - if (RsDirUtil::fileExists(mServiceDir + "/" + grpFile)) { - grpFile.insert(0, mServiceDir + "/"); - } else if (RsDirUtil::fileExists(grpFile)) { - // use old way for backward compatibility - //TODO: can be removed later - } else { - ok = false; - -//#ifdef RS_DATA_SERVICE_DEBUG - std::cerr << "RsDataService::locked_getGroup() cannot find group file " << grpFile; - std::cerr << std::endl; -//#endif - } - - if (ok) { - char grp_data[data_len]; - std::ifstream istrm(grpFile.c_str(), std::ios::binary); - istrm.seekg(offset, std::ios::beg); - istrm.read(grp_data, data_len); - - istrm.close(); - offset = 0; - ok &= grp->grp.GetTlv(grp_data, data_len, &offset); - } + data = (char*)c.getData(COL_NXS_DATA, data_len); + if(data) + ok &= grp->grp.GetTlv(data, data_len, &offset); } if(ok) @@ -466,7 +648,7 @@ RsGxsMsgMetaData* RsDataService::locked_getMsgMeta(RetroCursor &c) offset = 0; data = (char*)c.getData(COL_SIGN_SET, data_len); msgMeta->signSet.GetTlv(data, data_len, &offset); - msgMeta->mMsgSize = c.getInt32(COL_NXS_FILE_LEN); + msgMeta->mMsgSize = c.getInt32(COL_MSG_DATA_LEN); msgMeta->mMsgFlags = c.getInt32(COL_NXS_FLAGS); msgMeta->mPublishTs = c.getInt32(COL_TIME_STAMP); @@ -517,41 +699,12 @@ RsNxsMsg* RsDataService::locked_getMessage(RetroCursor &c) msg->meta.GetTlv(data, data_len, &offset); } - /* now retrieve grp data from file */ - std::string msgFile; - c.getString(COL_NXS_FILE, msgFile); - offset = c.getInt32(COL_NXS_FILE_OFFSET); - data_len = c.getInt32(COL_NXS_FILE_LEN); - ok &= !msgFile.empty(); - + /* now retrieve msg data */ + offset = 0; data_len = 0; if(ok){ - - // first try to find the file in the service dir - if (RsDirUtil::fileExists(mServiceDir + "/" + msgFile)) { - msgFile.insert(0, mServiceDir + "/"); - } else if (RsDirUtil::fileExists(msgFile)) { - // use old way for backward compatibility - //TODO: can be removed later - } else { - ok = false; - -//#ifdef RS_DATA_SERVICE_DEBUG - std::cerr << "RsDataService::locked_getMessage() cannot find message file " << msgFile; - std::cerr << std::endl; -//#endif - } - - if (ok) { - char* msg_data = new char[data_len]; - std::ifstream istrm(msgFile.c_str(), std::ios::binary); - istrm.seekg(offset, std::ios::beg); - istrm.read(msg_data, data_len); - - istrm.close(); - offset = 0; - ok &= msg->msg.GetTlv(msg_data, data_len, &offset); - delete[] msg_data; - } + data = (char*)c.getData(COL_NXS_DATA, data_len); + if(data) + ok &= msg->msg.GetTlv(data, data_len, &offset); } if(ok) @@ -592,18 +745,15 @@ int RsDataService::storeMessage(std::map &msg) continue; } - // create or access file in binary - std::string filename = msgPtr->grpId.toStdString() + "-msgs"; - std::string msgFile = mServiceDir + "/" + filename; - std::fstream ostrm(msgFile.c_str(), std::ios::binary | std::ios::app | std::ios::out); - ostrm.seekg(0, std::ios::end); // go to end to append - uint32_t offset = ostrm.tellg(); // get fill offset - ContentValue cv; - cv.put(KEY_NXS_FILE_OFFSET, (int32_t)offset); - cv.put(KEY_NXS_FILE, filename); - cv.put(KEY_NXS_FILE_LEN, (int32_t)msgPtr->msg.TlvSize()); + uint32_t dataLen = msgPtr->msg.TlvSize(); + char msgData[dataLen]; + uint32_t offset = 0; + msgPtr->msg.SetTlv(msgData, dataLen, &offset); + cv.put(KEY_NXS_DATA, dataLen, msgData); + + cv.put(KEY_NXS_DATA_LEN, (int32_t)dataLen); cv.put(KEY_MSG_ID, msgMetaPtr->mMsgId.toStdString()); cv.put(KEY_GRP_ID, msgMetaPtr->mGroupId.toStdString()); cv.put(KEY_NXS_SERV_STRING, msgMetaPtr->mServiceString); @@ -635,13 +785,6 @@ int RsDataService::storeMessage(std::map &msg) cv.put(KEY_MSG_STATUS, (int32_t)msgMetaPtr->mMsgStatus); cv.put(KEY_CHILD_TS, (int32_t)msgMetaPtr->mChildTs); - offset = 0; - char* msgData = new char[msgPtr->msg.TlvSize()]; - msgPtr->msg.SetTlv(msgData, msgPtr->msg.TlvSize(), &offset); - ostrm.write(msgData, msgPtr->msg.TlvSize()); - ostrm.close(); - delete[] msgData; - if (!mDb->sqlInsert(MSG_TABLE_NAME, "", cv)) { std::cerr << "RsDataService::storeMessage() sqlInsert Failed"; @@ -703,22 +846,21 @@ int RsDataService::storeGroup(std::map &grp) std::cerr << std::endl; #endif - std::string filename = grpPtr->grpId.toStdString(); - std::string grpFile = mServiceDir + "/" + filename; - std::fstream ostrm(grpFile.c_str(), std::ios::binary | std::ios::app | std::ios::out); - ostrm.seekg(0, std::ios::end); // go to end to append - uint32_t offset = ostrm.tellg(); // get fill offset - /*! - * STORE file offset, file length, file name, + * STORE data, data len, * grpId, flags, publish time stamp, identity, * id signature, admin signatue, key set, last posting ts * and meta data **/ ContentValue cv; - cv.put(KEY_NXS_FILE_OFFSET, (int32_t)offset); - cv.put(KEY_NXS_FILE_LEN, (int32_t)grpPtr->grp.TlvSize()); - cv.put(KEY_NXS_FILE, filename); + + uint32_t dataLen = grpPtr->grp.TlvSize(); + char grpData[dataLen]; + uint32_t offset = 0; + grpPtr->grp.SetTlv(grpData, dataLen, &offset); + cv.put(KEY_NXS_DATA, dataLen, grpData); + + cv.put(KEY_NXS_DATA_LEN, (int32_t) dataLen); cv.put(KEY_GRP_ID, grpPtr->grpId.toStdString()); cv.put(KEY_GRP_NAME, grpMetaPtr->mGroupName); cv.put(KEY_ORIG_GRP_ID, grpMetaPtr->mOrigGrpId.toStdString()); @@ -754,12 +896,6 @@ int RsDataService::storeGroup(std::map &grp) cv.put(KEY_GRP_STATUS, (int32_t)grpMetaPtr->mGroupStatus); cv.put(KEY_GRP_LAST_POST, (int32_t)grpMetaPtr->mLastPost); - offset = 0; - char grpData[grpPtr->grp.TlvSize()]; - grpPtr->grp.SetTlv(grpData, grpPtr->grp.TlvSize(), &offset); - ostrm.write(grpData, grpPtr->grp.TlvSize()); - ostrm.close(); - if (!mDb->sqlInsert(GRP_TABLE_NAME, "", cv)) { std::cerr << "RsDataService::storeGroup() sqlInsert Failed"; @@ -802,21 +938,20 @@ int RsDataService::updateGroup(std::map &grp) // if data is larger than max item size do not add if(!validSize(grpPtr)) continue; - std::string filename = grpPtr->grpId.toStdString(); - std::string grpFile = mServiceDir + "/" + filename; - std::ofstream ostrm(grpFile.c_str(), std::ios::binary | std::ios::trunc); - uint32_t offset = 0; // get file offset - /*! - * STORE file offset, file length, file name, + * STORE data, data len, * grpId, flags, publish time stamp, identity, * id signature, admin signatue, key set, last posting ts * and meta data **/ ContentValue cv; - cv.put(KEY_NXS_FILE_OFFSET, (int32_t)offset); - cv.put(KEY_NXS_FILE_LEN, (int32_t)grpPtr->grp.TlvSize()); - cv.put(KEY_NXS_FILE, filename); + uint32_t dataLen = grpPtr->grp.TlvSize(); + char grpData[dataLen]; + uint32_t offset = 0; + grpPtr->grp.SetTlv(grpData, dataLen, &offset); + cv.put(KEY_NXS_DATA, dataLen, grpData); + + cv.put(KEY_NXS_DATA_LEN, (int32_t) dataLen); cv.put(KEY_GRP_ID, grpPtr->grpId.toStdString()); cv.put(KEY_GRP_NAME, grpMetaPtr->mGroupName); cv.put(KEY_ORIG_GRP_ID, grpMetaPtr->mOrigGrpId.toStdString()); @@ -849,12 +984,6 @@ int RsDataService::updateGroup(std::map &grp) cv.put(KEY_GRP_STATUS, (int32_t)grpMetaPtr->mGroupStatus); cv.put(KEY_GRP_LAST_POST, (int32_t)grpMetaPtr->mLastPost); - offset = 0; - char grpData[grpPtr->grp.TlvSize()]; - grpPtr->grp.SetTlv(grpData, grpPtr->grp.TlvSize(), &offset); - ostrm.write(grpData, grpPtr->grp.TlvSize()); - ostrm.close(); - mDb->sqlUpdate(GRP_TABLE_NAME, "grpId='" + grpPtr->grpId.toStdString() + "'", cv); } // finish transaction @@ -1368,13 +1497,14 @@ int RsDataService::resetDataStore() } mDb->execSQL("DROP INDEX " + MSG_INDEX_GRPID); + mDb->execSQL("DROP TABLE " + DATABASE_RELEASE_TABLE_NAME); mDb->execSQL("DROP TABLE " + MSG_TABLE_NAME); mDb->execSQL("DROP TABLE " + GRP_TABLE_NAME); mDb->execSQL("DROP TRIGGER " + GRP_LAST_POST_UPDATE_TRIGGER); } // recreate database - initialise(); + initialise(true); return 1; } @@ -1395,109 +1525,21 @@ int RsDataService::updateMessageMetaData(MsgLocMetaData &metaData) + "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", metaData.val) ? 1 : 0; } -MsgOffset offSetAccum(const MsgOffset& x, const MsgOffset& y) -{ - MsgOffset m; - m.msgLen = y.msgLen + x.msgLen; - return m; -} - int RsDataService::removeMsgs(const GxsMsgReq& msgIds) { RsStackMutex stack(mDbMutex); - // for each group - // get for all msgs their offsets and lengths - // for message not contained in msg id vector - // store their data file segments in buffer - // then recalculate the retained messages' - // new offsets, update db with new offsets - // replace old msg file with new file - // remove messages that were not retained from - // db - GxsMsgReq::const_iterator mit = msgIds.begin(); - for(; mit != msgIds.end(); ++mit) { - MsgUpdates updates; const std::vector& msgIdV = mit->second; const RsGxsGroupId& grpId = mit->first; - GxsMsgReq reqIds; - reqIds.insert(std::make_pair(grpId, std::vector() )); - - // can get offsets for each file - std::vector msgOffsets; - locked_getMessageOffsets(grpId, msgOffsets); - - std::string oldFileName = mServiceDir + "/" + grpId.toStdString() + "-msgs"; - std::string newFileName = mServiceDir + "/" + grpId.toStdString() + "-msgs-temp"; - std::ifstream in(oldFileName.c_str(), std::ios::binary); - std::vector dataBuff, newBuffer; - - std::vector::iterator vit = msgOffsets.begin(); - - uint32_t maxSize = 0;// größe aller msgs, newbuf könnte aber kleiner sein, weil msgs weggehen - for(; vit != msgOffsets.end(); ++vit) - maxSize += vit->msgLen; - - // may be preferable to determine file len reality - // from file? corrupt db? - dataBuff.reserve(maxSize);// dataBuff.resize(maxSize); - newBuffer.reserve(maxSize);// newBuffer.resize(maxSize); - - dataBuff.insert(dataBuff.end(), - std::istreambuf_iterator(in), - std::istreambuf_iterator()); - - in.close(); - uint32_t newOffset = 0;// am anfang der liste ist offset=0, jetzt gehen wir die msgs liste durch - for(std::vector::size_type i = 0; i < msgOffsets.size(); ++i) - { - const MsgOffset& m = msgOffsets[i]; - - //uint32_t newOffset = 0;//hier ist es zu spät, offset muss hochgezählt werden - if(std::find(msgIdV.begin(), msgIdV.end(), m.msgId) == msgIdV.end()) - { - MsgUpdate up; - - uint32_t msgLen = m.msgLen; - - up.msgId = m.msgId; - up.cv.put(KEY_NXS_FILE_OFFSET, (int32_t)newOffset); - - newBuffer.insert(newBuffer.end(), dataBuff.begin()+m.msgOffset, - dataBuff.begin()+m.msgOffset+m.msgLen); - - newOffset += msgLen; - - up.cv.put(KEY_NXS_FILE_LEN, (int32_t)msgLen); - - // add msg update - updates[grpId].push_back(up); - } - } - - std::ofstream out(newFileName.c_str(), std::ios::binary); - - std::copy(newBuffer.begin(), newBuffer.end(), - std::ostreambuf_iterator(out)); - - out.close(); - - // now update the new positions in db - locked_updateMessageEntries(updates); - - // then delete removed messages + // delete messages GxsMsgReq msgsToDelete; msgsToDelete[grpId] = msgIdV; locked_removeMessageEntries(msgsToDelete); - - // now replace old file location with new file - remove(oldFileName.c_str()); - RsDirUtil::renameFile(newFileName, oldFileName); } return 1; @@ -1508,17 +1550,6 @@ int RsDataService::removeGroups(const std::vector &grpIds) RsStackMutex stack(mDbMutex); - // the grp id is the group file name - // first remove file then remove group - // from db - - std::vector::const_iterator vit = grpIds.begin(); - for(; vit != grpIds.end(); ++vit) - { - const std::string grpFileName = mServiceDir + "/" + (*vit).toStdString(); - remove(grpFileName.c_str()); - } - locked_removeGroupEntries(grpIds); return 1; @@ -1605,33 +1636,6 @@ int RsDataService::retrieveMsgIds(const RsGxsGroupId& grpId, RsGxsMessageId::std } -bool RsDataService::locked_updateMessageEntries(const MsgUpdates& updates) -{ - // start a transaction - bool ret = mDb->beginTransaction(); - - MsgUpdates::const_iterator mit = updates.begin(); - - for(; mit != updates.end(); ++mit) - { - - const RsGxsGroupId& grpId = mit->first; - const std::vector& updateV = mit->second; - std::vector::const_iterator vit = updateV.begin(); - - for(; vit != updateV.end(); ++vit) - { - const MsgUpdate& update = *vit; - mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() - + "' AND " + KEY_MSG_ID + "='" + update.msgId.toStdString() + "'", update.cv); - } - } - - ret &= mDb->commitTransaction(); - - return ret; -} - bool RsDataService::locked_removeMessageEntries(const GxsMsgReq& msgIds) { // start a transaction @@ -1676,38 +1680,6 @@ bool RsDataService::locked_removeGroupEntries(const std::vector& g return ret; } -void RsDataService::locked_getMessageOffsets(const RsGxsGroupId& grpId, std::vector& offsets) -{ - - RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgOffSetColumns, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", ""); - - if(c) - { - bool valid = c->moveToFirst(); - - while(valid) - { - RsGxsMessageId msgId; - int32_t msgLen; - int32_t msgOffSet; - std::string temp; - c->getString(0, temp); - msgId = RsGxsMessageId(temp); - msgOffSet = c->getInt32(1); - msgLen = c->getInt32(2); - - MsgOffset offset; - offset.msgId = msgId; - offset.msgLen = msgLen; - offset.msgOffset = msgOffSet; - offsets.push_back(offset); - - valid = c->moveToNext(); - } - delete c; - } -} - uint32_t RsDataService::cacheSize() const { return 0; } diff --git a/libretroshare/src/gxs/rsdataservice.h b/libretroshare/src/gxs/rsdataservice.h index aafa2eb45..91b4252e9 100644 --- a/libretroshare/src/gxs/rsdataservice.h +++ b/libretroshare/src/gxs/rsdataservice.h @@ -29,15 +29,6 @@ #include "gxs/rsgds.h" #include "util/retrodb.h" -class MsgOffset -{ -public: - - MsgOffset() : msgOffset(0), msgLen(0) {} - RsGxsMessageId msgId; - uint32_t msgOffset, msgLen; -}; - class MsgUpdate { public: @@ -233,8 +224,9 @@ private: /*! * Creates an sql database and its associated file * also creates the message and groups table + * @param isNewDatabase is new database */ - void initialise(); + void initialise(bool isNewDatabase); /*! * Remove entries for data base @@ -243,19 +235,21 @@ private: bool locked_removeMessageEntries(const GxsMsgReq& msgIds); bool locked_removeGroupEntries(const std::vector& grpIds); - typedef std::map > MsgUpdates; +private: + /*! + * Start release update + * @param release + * @return true/false + */ + bool startReleaseUpdate(int release); /*! - * Update messages entries with new values - * @param msgIds - * @param cv contains values to update message entries with + * Finish release update + * @param release + * @param result + * @return true/false */ - bool locked_updateMessageEntries(const MsgUpdates& updates); - - -private: - - void locked_getMessageOffsets(const RsGxsGroupId& grpId, std::vector& msgOffsets); + bool finishReleaseUpdate(int release, bool result); private: @@ -263,7 +257,6 @@ private: std::list msgColumns; std::list msgMetaColumns; - std::list mMsgOffSetColumns; std::list mMsgIdColumn; std::list grpColumns;