Moved gxs data from files into database

- Added update to RsDataService
- Added new table "DATABASE_RELEASE" to database

Advantages:
- Better performance because of the removed additional file access
- Transaction safety
- All groups and messages are stored in the database

Attention:
Please make a backup of your data folder before you try this version.
The first start of the application will update the database and can take a little bit longer. Once the database was converted the messages cannot be read by older versions of the application.
This commit is contained in:
thunder2 2015-08-16 13:27:04 +02:00
parent 6893c5867c
commit f30ed24a4a
2 changed files with 344 additions and 379 deletions

View File

@ -38,18 +38,19 @@
#endif
#include "rsdataservice.h"
#include "util/rsstring.h"
#define MSG_TABLE_NAME std::string("MESSAGES")
#define GRP_TABLE_NAME std::string("GROUPS")
#define DATABASE_RELEASE_TABLE_NAME std::string("DATABASE_RELEASE")
#define GRP_LAST_POST_UPDATE_TRIGGER std::string("LAST_POST_UPDATE")
#define MSG_INDEX_GRPID std::string("INDEX_MESSAGES_GRPID")
// generic
#define KEY_NXS_FILE std::string("nxsFile")
#define KEY_NXS_FILE_OFFSET std::string("fileOffset")
#define KEY_NXS_FILE_LEN std::string("nxsFileLen")
#define KEY_NXS_DATA std::string("nxsData")
#define KEY_NXS_DATA_LEN std::string("nxsDataLen")
#define KEY_NXS_IDENTITY std::string("identity")
#define KEY_GRP_ID std::string("grpId")
#define KEY_ORIG_GRP_ID std::string("origGrpId")
@ -62,6 +63,10 @@
#define KEY_NXS_HASH std::string("hash")
#define KEY_RECV_TS std::string("recv_time_stamp")
// remove later
#define KEY_NXS_FILE_OLD std::string("nxsFile")
#define KEY_NXS_FILE_OFFSET_OLD std::string("fileOffset")
#define KEY_NXS_FILE_LEN_OLD std::string("nxsFileLen")
// grp table columns
#define KEY_KEY_SET std::string("keySet")
@ -93,6 +98,10 @@
#define KEY_MSG_STATUS std::string("msgStatus")
#define KEY_CHILD_TS std::string("childTs")
// database release columns
#define KEY_DATABASE_RELEASE_ID std::string("id")
#define KEY_DATABASE_RELEASE_ID_VALUE 1
#define KEY_DATABASE_RELEASE std::string("release")
@ -100,11 +109,10 @@
// generic
#define COL_ACT_GROUP_ID 0
#define COL_NXS_FILE 1
#define COL_NXS_FILE_OFFSET 2
#define COL_NXS_FILE_LEN 3
#define COL_META_DATA 4
#define COL_ACT_MSG_ID 5
#define COL_NXS_DATA 1
#define COL_NXS_DATA_LEN 2
#define COL_META_DATA 3
#define COL_ACT_MSG_ID 4
/*** meta column numbers ***/
@ -128,6 +136,7 @@
#define COL_PARENT_GRP_ID 21
#define COL_GRP_RECV_TS 22
#define COL_GRP_REP_CUTOFF 23
#define COL_GRP_DATA_LEN 24
// msg col numbers
@ -140,6 +149,7 @@
#define COL_MSG_NAME 12
#define COL_MSG_SERV_STRING 13
#define COL_MSG_RECV_TS 14
#define COL_MSG_DATA_LEN 15
// generic meta shared col numbers
#define COL_GRP_ID 0
@ -161,10 +171,13 @@ const uint32_t RsGeneralDataService::GXS_MAX_ITEM_SIZE = 1572864; // 1.5 Mbytes
RsDataService::RsDataService(const std::string &serviceDir, const std::string &dbName, uint16_t serviceType,
RsGxsSearchModule * /* mod */, const std::string& key)
: RsGeneralDataService(), mDbMutex("RsDataService"), mServiceDir(serviceDir), mDbName(dbName), mDbPath(mServiceDir + "/" + dbName), mServType(serviceType),
mDb( new RetroDb(mDbPath, RetroDb::OPEN_READWRITE_CREATE, key)) {
: RsGeneralDataService(), mDbMutex("RsDataService"), mServiceDir(serviceDir), mDbName(dbName), mDbPath(mServiceDir + "/" + dbName), mServType(serviceType), mDb(NULL)
{
bool isNewDatabase = !RsDirUtil::fileExists(mDbPath);
initialise();
mDb = new RetroDb(mDbPath, RetroDb::OPEN_READWRITE_CREATE, key);
initialise(isNewDatabase);
// for retrieving msg meta
msgMetaColumns.push_back(KEY_GRP_ID); msgMetaColumns.push_back(KEY_TIME_STAMP); msgMetaColumns.push_back(KEY_NXS_FLAGS);
@ -172,10 +185,11 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d
msgMetaColumns.push_back(KEY_MSG_ID); msgMetaColumns.push_back(KEY_ORIG_MSG_ID); msgMetaColumns.push_back(KEY_MSG_STATUS);
msgMetaColumns.push_back(KEY_CHILD_TS); msgMetaColumns.push_back(KEY_MSG_PARENT_ID); msgMetaColumns.push_back(KEY_MSG_THREAD_ID);
msgMetaColumns.push_back(KEY_MSG_NAME); msgMetaColumns.push_back(KEY_NXS_SERV_STRING); msgMetaColumns.push_back(KEY_RECV_TS);
msgMetaColumns.push_back(KEY_NXS_DATA_LEN);
// for retrieving actual data
msgColumns.push_back(KEY_GRP_ID); msgColumns.push_back(KEY_NXS_FILE); msgColumns.push_back(KEY_NXS_FILE_OFFSET);
msgColumns.push_back(KEY_NXS_FILE_LEN); msgColumns.push_back(KEY_NXS_META); msgColumns.push_back(KEY_MSG_ID);
msgColumns.push_back(KEY_GRP_ID); msgColumns.push_back(KEY_NXS_DATA); msgColumns.push_back(KEY_NXS_DATA_LEN);
msgColumns.push_back(KEY_NXS_META); msgColumns.push_back(KEY_MSG_ID);
// for retrieving grp meta data
grpMetaColumns.push_back(KEY_GRP_ID); grpMetaColumns.push_back(KEY_TIME_STAMP); grpMetaColumns.push_back(KEY_NXS_FLAGS);
@ -186,16 +200,11 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d
grpMetaColumns.push_back(KEY_GRP_SIGN_FLAGS); grpMetaColumns.push_back(KEY_GRP_CIRCLE_ID); grpMetaColumns.push_back(KEY_GRP_CIRCLE_TYPE);
grpMetaColumns.push_back(KEY_GRP_INTERNAL_CIRCLE); grpMetaColumns.push_back(KEY_GRP_ORIGINATOR);
grpMetaColumns.push_back(KEY_GRP_AUTHEN_FLAGS); grpMetaColumns.push_back(KEY_PARENT_GRP_ID); grpMetaColumns.push_back(KEY_RECV_TS);
grpMetaColumns.push_back(KEY_GRP_REP_CUTOFF);
grpMetaColumns.push_back(KEY_GRP_REP_CUTOFF); grpMetaColumns.push_back(KEY_NXS_DATA_LEN);
// for retrieving actual grp data
grpColumns.push_back(KEY_GRP_ID); grpColumns.push_back(KEY_NXS_FILE); grpColumns.push_back(KEY_NXS_FILE_OFFSET);
grpColumns.push_back(KEY_NXS_FILE_LEN); grpColumns.push_back(KEY_NXS_META);
// for retrieving msg offsets
mMsgOffSetColumns.push_back(KEY_MSG_ID); mMsgOffSetColumns.push_back(KEY_NXS_FILE_OFFSET);
mMsgOffSetColumns.push_back(KEY_NXS_FILE_LEN);
grpColumns.push_back(KEY_GRP_ID); grpColumns.push_back(KEY_NXS_DATA); grpColumns.push_back(KEY_NXS_DATA_LEN);
grpColumns.push_back(KEY_NXS_META);
grpIdColumn.push_back(KEY_GRP_ID);
@ -213,15 +222,103 @@ RsDataService::~RsDataService(){
delete mDb;
}
void RsDataService::initialise(){
static bool moveDataFromFileToDatabase(RetroDb *db, const std::string serviceDir, const std::string &tableName, const std::string &keyId, std::list<std::string> &files)
{
bool ok = true;
// Move message data
std::list<std::string> columns;
columns.push_back(keyId);
columns.push_back(KEY_NXS_FILE_OLD);
columns.push_back(KEY_NXS_FILE_OFFSET_OLD);
columns.push_back(KEY_NXS_FILE_LEN_OLD);
RetroCursor* c = db->sqlQuery(tableName, columns, "", "");
if (c)
{
bool valid = c->moveToFirst();
while (ok && valid){
std::string dataFile;
c->getString(1, dataFile);
if (!dataFile.empty()) {
bool fileOk = true;
// first try to find the file in the service dir
if (RsDirUtil::fileExists(serviceDir + "/" + dataFile)) {
dataFile.insert(0, serviceDir + "/");
} else if (RsDirUtil::fileExists(dataFile)) {
// use old way for backward compatibility
//TODO: can be removed later
} else {
fileOk = false;
std::cerr << "moveDataFromFileToDatabase() cannot find file " << dataFile;
std::cerr << std::endl;
}
if (fileOk) {
std::string id;
c->getString(0, id);
uint32_t offset = c->getInt32(2);
uint32_t data_len = c->getInt32(3);
char* data = new char[data_len];
std::ifstream istrm(dataFile.c_str(), std::ios::binary);
istrm.seekg(offset, std::ios::beg);
istrm.read(data, data_len);
istrm.close();
ContentValue cv;
// insert new columns
cv.put(KEY_NXS_DATA, data_len, data);
cv.put(KEY_NXS_DATA_LEN, (int32_t) data_len);
// clear old columns
cv.put(KEY_NXS_FILE_OLD, "");
cv.put(KEY_NXS_FILE_OFFSET_OLD, 0);
cv.put(KEY_NXS_FILE_LEN_OLD, 0);
ok = db->sqlUpdate(tableName, keyId + "='" + id + "'", cv);
delete[] data;
if (std::find(files.begin(), files.end(), dataFile) == files.end()) {
files.push_back(dataFile);
}
}
}
valid = c->moveToNext();
}
delete c;
}
return ok;
}
void RsDataService::initialise(bool isNewDatabase)
{
const int databaseRelease = 1;
int currentDatabaseRelease = 0;
bool ok = true;
RsStackMutex stack(mDbMutex);
// initialise database
if (isNewDatabase || !mDb->tableExists(DATABASE_RELEASE_TABLE_NAME)) {
// create table for database release
mDb->execSQL("CREATE TABLE " + DATABASE_RELEASE_TABLE_NAME + "(" +
KEY_DATABASE_RELEASE_ID + " INT PRIMARY KEY," +
KEY_DATABASE_RELEASE + " INT);");
}
if (isNewDatabase) {
// create table for msg data
mDb->execSQL("CREATE TABLE IF NOT EXISTS " + MSG_TABLE_NAME + "(" +
mDb->execSQL("CREATE TABLE " + MSG_TABLE_NAME + "(" +
KEY_MSG_ID + " TEXT PRIMARY KEY," +
KEY_GRP_ID + " TEXT," +
KEY_NXS_FLAGS + " INT," +
@ -229,8 +326,8 @@ void RsDataService::initialise(){
KEY_TIME_STAMP + " INT," +
KEY_NXS_IDENTITY + " TEXT," +
KEY_SIGN_SET + " BLOB," +
KEY_NXS_FILE + " TEXT,"+
KEY_NXS_FILE_OFFSET + " INT," +
KEY_NXS_DATA + " BLOB,"+
KEY_NXS_DATA_LEN + " INT," +
KEY_MSG_STATUS + " INT," +
KEY_CHILD_TS + " INT," +
KEY_NXS_META + " BLOB," +
@ -239,17 +336,15 @@ void RsDataService::initialise(){
KEY_MSG_NAME + " TEXT," +
KEY_NXS_SERV_STRING + " TEXT," +
KEY_NXS_HASH + " TEXT," +
KEY_RECV_TS + " INT," +
KEY_NXS_FILE_LEN + " INT);");
KEY_RECV_TS + " INT);");
// create table for grp data
mDb->execSQL("CREATE TABLE IF NOT EXISTS " + GRP_TABLE_NAME + "(" +
mDb->execSQL("CREATE TABLE " + GRP_TABLE_NAME + "(" +
KEY_GRP_ID + " TEXT PRIMARY KEY," +
KEY_TIME_STAMP + " INT," +
KEY_NXS_FILE + " TEXT," +
KEY_NXS_FILE_OFFSET + " INT," +
KEY_NXS_DATA + " BLOB," +
KEY_NXS_DATA_LEN + " INT," +
KEY_KEY_SET + " BLOB," +
KEY_NXS_FILE_LEN + " INT," +
KEY_NXS_META + " BLOB," +
KEY_GRP_NAME + " TEXT," +
KEY_GRP_LAST_POST + " INT," +
@ -273,14 +368,130 @@ void RsDataService::initialise(){
KEY_GRP_REP_CUTOFF + " INT," +
KEY_SIGN_SET + " BLOB);");
mDb->execSQL("CREATE TRIGGER IF NOT EXISTS " + GRP_LAST_POST_UPDATE_TRIGGER +
mDb->execSQL("CREATE TRIGGER " + GRP_LAST_POST_UPDATE_TRIGGER +
" INSERT ON " + MSG_TABLE_NAME +
std::string(" BEGIN ") +
" UPDATE " + GRP_TABLE_NAME + " SET " + KEY_GRP_LAST_POST + "= new."
+ KEY_RECV_TS + " WHERE " + KEY_GRP_ID + "=new." + KEY_GRP_ID + ";"
+ std::string("END;"));
mDb->execSQL("CREATE INDEX IF NOT EXISTS " + MSG_INDEX_GRPID + " ON " + MSG_TABLE_NAME + "(" + KEY_GRP_ID + ");");
mDb->execSQL("CREATE INDEX " + MSG_INDEX_GRPID + " ON " + MSG_TABLE_NAME + "(" + KEY_GRP_ID + ");");
// Insert release, no need to upgrade
ContentValue cv;
cv.put(KEY_DATABASE_RELEASE_ID, KEY_DATABASE_RELEASE_ID_VALUE);
cv.put(KEY_DATABASE_RELEASE, databaseRelease);
mDb->sqlInsert(DATABASE_RELEASE_TABLE_NAME, "", cv);
currentDatabaseRelease = databaseRelease;
} else {
// check release
{
// try to select the release
std::list<std::string> columns;
columns.push_back(KEY_DATABASE_RELEASE);
std::string where;
rs_sprintf(where, "%s=%d", KEY_DATABASE_RELEASE_ID.c_str(), KEY_DATABASE_RELEASE_ID_VALUE);
RetroCursor* c = mDb->sqlQuery(DATABASE_RELEASE_TABLE_NAME, columns, where, "");
if (c) {
ok = c->moveToFirst();
if (ok) {
currentDatabaseRelease = c->getInt32(0);
}
delete c;
if (!ok) {
// No record found ... insert the record
ContentValue cv;
cv.put(KEY_DATABASE_RELEASE_ID, KEY_DATABASE_RELEASE_ID_VALUE);
cv.put(KEY_DATABASE_RELEASE, currentDatabaseRelease);
ok = mDb->sqlInsert(DATABASE_RELEASE_TABLE_NAME, "", cv);
}
} else {
ok = false;
}
}
// Release 1
int newRelease = 1;
if (ok && currentDatabaseRelease < newRelease) {
// Update database
std::list<std::string> files;
ok = startReleaseUpdate(newRelease);
// Move data in files into database
ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " ADD COLUMN " + KEY_NXS_DATA + " BLOB;");
ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " ADD COLUMN " + KEY_NXS_DATA_LEN + " INT;");
ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " ADD COLUMN " + KEY_NXS_DATA + " BLOB;");
ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " ADD COLUMN " + KEY_NXS_DATA_LEN + " INT;");
ok = ok && moveDataFromFileToDatabase(mDb, mServiceDir, GRP_TABLE_NAME, KEY_GRP_ID, files);
ok = ok && moveDataFromFileToDatabase(mDb, mServiceDir, MSG_TABLE_NAME, KEY_MSG_ID, files);
// SQLite doesn't support DROP COLUMN
// ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_OLD + ";");
// ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_OFFSET_OLD + ";");
// ok = ok && mDb->execSQL("ALTER TABLE " + GRP_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_LEN_OLD + ";");
// ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_OLD + ";");
// ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_OFFSET_OLD + ";");
// ok = ok && mDb->execSQL("ALTER TABLE " + MSG_TABLE_NAME + " DROP COLUMN " + KEY_NXS_FILE_LEN_OLD + ";");
ok = finishReleaseUpdate(newRelease, ok);
if (ok) {
// Remove transfered files
std::list<std::string>::const_iterator file;
for (file = files.begin(); file != files.end(); ++file) {
remove(file->c_str());
}
currentDatabaseRelease = newRelease;
}
}
}
if (ok) {
std::cerr << "Database " << mDbName << " release " << currentDatabaseRelease << " successfully initialised." << std::endl;
} else {
std::cerr << "Database " << mDbName << " initialisation failed." << std::endl;
}
}
bool RsDataService::startReleaseUpdate(int release)
{
// Update database
std::cerr << "Database " << mDbName << " update to release " << release << "." << std::endl;
return mDb->beginTransaction();
}
bool RsDataService::finishReleaseUpdate(int release, bool result)
{
if (result) {
std::string where;
rs_sprintf(where, "%s=%d", KEY_DATABASE_RELEASE_ID.c_str(), KEY_DATABASE_RELEASE_ID_VALUE);
ContentValue cv;
cv.put(KEY_DATABASE_RELEASE, release);
result = mDb->sqlUpdate(DATABASE_RELEASE_TABLE_NAME, where, cv);
}
if (result) {
result = mDb->commitTransaction();
} else {
result = mDb->rollbackTransaction();
}
if (result) {
std::cerr << "Database " << mDbName << " successfully updated to release " << release << "." << std::endl;
} else {
std::cerr << "Database " << mDbName << " update to release " << release << "failed." << std::endl;
}
return result;
}
RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c)
@ -318,7 +529,7 @@ RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c)
grpMeta->mPublishTs = c.getInt32(COL_TIME_STAMP);
grpMeta->mGroupFlags = c.getInt32(COL_NXS_FLAGS);
grpMeta->mGrpSize = c.getInt32(COL_NXS_FILE_LEN);
grpMeta->mGrpSize = c.getInt32(COL_GRP_DATA_LEN);
offset = 0;
@ -387,41 +598,12 @@ RsNxsGrp* RsDataService::locked_getGroup(RetroCursor &c)
grp->meta.GetTlv(data, data_len, &offset);
}
/* now retrieve grp data from file */
std::string grpFile;
c.getString(COL_NXS_FILE, grpFile);
ok &= !grpFile.empty();
/* now retrieve grp data */
offset = 0; data_len = 0;
if(ok){
data_len = c.getInt32(COL_NXS_FILE_LEN);
offset = c.getInt32(COL_NXS_FILE_OFFSET);
// first try to find the file in the service dir
if (RsDirUtil::fileExists(mServiceDir + "/" + grpFile)) {
grpFile.insert(0, mServiceDir + "/");
} else if (RsDirUtil::fileExists(grpFile)) {
// use old way for backward compatibility
//TODO: can be removed later
} else {
ok = false;
//#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::locked_getGroup() cannot find group file " << grpFile;
std::cerr << std::endl;
//#endif
}
if (ok) {
char grp_data[data_len];
std::ifstream istrm(grpFile.c_str(), std::ios::binary);
istrm.seekg(offset, std::ios::beg);
istrm.read(grp_data, data_len);
istrm.close();
offset = 0;
ok &= grp->grp.GetTlv(grp_data, data_len, &offset);
}
data = (char*)c.getData(COL_NXS_DATA, data_len);
if(data)
ok &= grp->grp.GetTlv(data, data_len, &offset);
}
if(ok)
@ -466,7 +648,7 @@ RsGxsMsgMetaData* RsDataService::locked_getMsgMeta(RetroCursor &c)
offset = 0;
data = (char*)c.getData(COL_SIGN_SET, data_len);
msgMeta->signSet.GetTlv(data, data_len, &offset);
msgMeta->mMsgSize = c.getInt32(COL_NXS_FILE_LEN);
msgMeta->mMsgSize = c.getInt32(COL_MSG_DATA_LEN);
msgMeta->mMsgFlags = c.getInt32(COL_NXS_FLAGS);
msgMeta->mPublishTs = c.getInt32(COL_TIME_STAMP);
@ -517,41 +699,12 @@ RsNxsMsg* RsDataService::locked_getMessage(RetroCursor &c)
msg->meta.GetTlv(data, data_len, &offset);
}
/* now retrieve grp data from file */
std::string msgFile;
c.getString(COL_NXS_FILE, msgFile);
offset = c.getInt32(COL_NXS_FILE_OFFSET);
data_len = c.getInt32(COL_NXS_FILE_LEN);
ok &= !msgFile.empty();
/* now retrieve msg data */
offset = 0; data_len = 0;
if(ok){
// first try to find the file in the service dir
if (RsDirUtil::fileExists(mServiceDir + "/" + msgFile)) {
msgFile.insert(0, mServiceDir + "/");
} else if (RsDirUtil::fileExists(msgFile)) {
// use old way for backward compatibility
//TODO: can be removed later
} else {
ok = false;
//#ifdef RS_DATA_SERVICE_DEBUG
std::cerr << "RsDataService::locked_getMessage() cannot find message file " << msgFile;
std::cerr << std::endl;
//#endif
}
if (ok) {
char* msg_data = new char[data_len];
std::ifstream istrm(msgFile.c_str(), std::ios::binary);
istrm.seekg(offset, std::ios::beg);
istrm.read(msg_data, data_len);
istrm.close();
offset = 0;
ok &= msg->msg.GetTlv(msg_data, data_len, &offset);
delete[] msg_data;
}
data = (char*)c.getData(COL_NXS_DATA, data_len);
if(data)
ok &= msg->msg.GetTlv(data, data_len, &offset);
}
if(ok)
@ -592,18 +745,15 @@ int RsDataService::storeMessage(std::map<RsNxsMsg *, RsGxsMsgMetaData *> &msg)
continue;
}
// create or access file in binary
std::string filename = msgPtr->grpId.toStdString() + "-msgs";
std::string msgFile = mServiceDir + "/" + filename;
std::fstream ostrm(msgFile.c_str(), std::ios::binary | std::ios::app | std::ios::out);
ostrm.seekg(0, std::ios::end); // go to end to append
uint32_t offset = ostrm.tellg(); // get fill offset
ContentValue cv;
cv.put(KEY_NXS_FILE_OFFSET, (int32_t)offset);
cv.put(KEY_NXS_FILE, filename);
cv.put(KEY_NXS_FILE_LEN, (int32_t)msgPtr->msg.TlvSize());
uint32_t dataLen = msgPtr->msg.TlvSize();
char msgData[dataLen];
uint32_t offset = 0;
msgPtr->msg.SetTlv(msgData, dataLen, &offset);
cv.put(KEY_NXS_DATA, dataLen, msgData);
cv.put(KEY_NXS_DATA_LEN, (int32_t)dataLen);
cv.put(KEY_MSG_ID, msgMetaPtr->mMsgId.toStdString());
cv.put(KEY_GRP_ID, msgMetaPtr->mGroupId.toStdString());
cv.put(KEY_NXS_SERV_STRING, msgMetaPtr->mServiceString);
@ -635,13 +785,6 @@ int RsDataService::storeMessage(std::map<RsNxsMsg *, RsGxsMsgMetaData *> &msg)
cv.put(KEY_MSG_STATUS, (int32_t)msgMetaPtr->mMsgStatus);
cv.put(KEY_CHILD_TS, (int32_t)msgMetaPtr->mChildTs);
offset = 0;
char* msgData = new char[msgPtr->msg.TlvSize()];
msgPtr->msg.SetTlv(msgData, msgPtr->msg.TlvSize(), &offset);
ostrm.write(msgData, msgPtr->msg.TlvSize());
ostrm.close();
delete[] msgData;
if (!mDb->sqlInsert(MSG_TABLE_NAME, "", cv))
{
std::cerr << "RsDataService::storeMessage() sqlInsert Failed";
@ -703,22 +846,21 @@ int RsDataService::storeGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
std::cerr << std::endl;
#endif
std::string filename = grpPtr->grpId.toStdString();
std::string grpFile = mServiceDir + "/" + filename;
std::fstream ostrm(grpFile.c_str(), std::ios::binary | std::ios::app | std::ios::out);
ostrm.seekg(0, std::ios::end); // go to end to append
uint32_t offset = ostrm.tellg(); // get fill offset
/*!
* STORE file offset, file length, file name,
* STORE data, data len,
* grpId, flags, publish time stamp, identity,
* id signature, admin signatue, key set, last posting ts
* and meta data
**/
ContentValue cv;
cv.put(KEY_NXS_FILE_OFFSET, (int32_t)offset);
cv.put(KEY_NXS_FILE_LEN, (int32_t)grpPtr->grp.TlvSize());
cv.put(KEY_NXS_FILE, filename);
uint32_t dataLen = grpPtr->grp.TlvSize();
char grpData[dataLen];
uint32_t offset = 0;
grpPtr->grp.SetTlv(grpData, dataLen, &offset);
cv.put(KEY_NXS_DATA, dataLen, grpData);
cv.put(KEY_NXS_DATA_LEN, (int32_t) dataLen);
cv.put(KEY_GRP_ID, grpPtr->grpId.toStdString());
cv.put(KEY_GRP_NAME, grpMetaPtr->mGroupName);
cv.put(KEY_ORIG_GRP_ID, grpMetaPtr->mOrigGrpId.toStdString());
@ -754,12 +896,6 @@ int RsDataService::storeGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
cv.put(KEY_GRP_STATUS, (int32_t)grpMetaPtr->mGroupStatus);
cv.put(KEY_GRP_LAST_POST, (int32_t)grpMetaPtr->mLastPost);
offset = 0;
char grpData[grpPtr->grp.TlvSize()];
grpPtr->grp.SetTlv(grpData, grpPtr->grp.TlvSize(), &offset);
ostrm.write(grpData, grpPtr->grp.TlvSize());
ostrm.close();
if (!mDb->sqlInsert(GRP_TABLE_NAME, "", cv))
{
std::cerr << "RsDataService::storeGroup() sqlInsert Failed";
@ -802,21 +938,20 @@ int RsDataService::updateGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
// if data is larger than max item size do not add
if(!validSize(grpPtr)) continue;
std::string filename = grpPtr->grpId.toStdString();
std::string grpFile = mServiceDir + "/" + filename;
std::ofstream ostrm(grpFile.c_str(), std::ios::binary | std::ios::trunc);
uint32_t offset = 0; // get file offset
/*!
* STORE file offset, file length, file name,
* STORE data, data len,
* grpId, flags, publish time stamp, identity,
* id signature, admin signatue, key set, last posting ts
* and meta data
**/
ContentValue cv;
cv.put(KEY_NXS_FILE_OFFSET, (int32_t)offset);
cv.put(KEY_NXS_FILE_LEN, (int32_t)grpPtr->grp.TlvSize());
cv.put(KEY_NXS_FILE, filename);
uint32_t dataLen = grpPtr->grp.TlvSize();
char grpData[dataLen];
uint32_t offset = 0;
grpPtr->grp.SetTlv(grpData, dataLen, &offset);
cv.put(KEY_NXS_DATA, dataLen, grpData);
cv.put(KEY_NXS_DATA_LEN, (int32_t) dataLen);
cv.put(KEY_GRP_ID, grpPtr->grpId.toStdString());
cv.put(KEY_GRP_NAME, grpMetaPtr->mGroupName);
cv.put(KEY_ORIG_GRP_ID, grpMetaPtr->mOrigGrpId.toStdString());
@ -849,12 +984,6 @@ int RsDataService::updateGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
cv.put(KEY_GRP_STATUS, (int32_t)grpMetaPtr->mGroupStatus);
cv.put(KEY_GRP_LAST_POST, (int32_t)grpMetaPtr->mLastPost);
offset = 0;
char grpData[grpPtr->grp.TlvSize()];
grpPtr->grp.SetTlv(grpData, grpPtr->grp.TlvSize(), &offset);
ostrm.write(grpData, grpPtr->grp.TlvSize());
ostrm.close();
mDb->sqlUpdate(GRP_TABLE_NAME, "grpId='" + grpPtr->grpId.toStdString() + "'", cv);
}
// finish transaction
@ -1368,13 +1497,14 @@ int RsDataService::resetDataStore()
}
mDb->execSQL("DROP INDEX " + MSG_INDEX_GRPID);
mDb->execSQL("DROP TABLE " + DATABASE_RELEASE_TABLE_NAME);
mDb->execSQL("DROP TABLE " + MSG_TABLE_NAME);
mDb->execSQL("DROP TABLE " + GRP_TABLE_NAME);
mDb->execSQL("DROP TRIGGER " + GRP_LAST_POST_UPDATE_TRIGGER);
}
// recreate database
initialise();
initialise(true);
return 1;
}
@ -1395,109 +1525,21 @@ int RsDataService::updateMessageMetaData(MsgLocMetaData &metaData)
+ "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", metaData.val) ? 1 : 0;
}
MsgOffset offSetAccum(const MsgOffset& x, const MsgOffset& y)
{
MsgOffset m;
m.msgLen = y.msgLen + x.msgLen;
return m;
}
int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
{
RsStackMutex stack(mDbMutex);
// for each group
// get for all msgs their offsets and lengths
// for message not contained in msg id vector
// store their data file segments in buffer
// then recalculate the retained messages'
// new offsets, update db with new offsets
// replace old msg file with new file
// remove messages that were not retained from
// db
GxsMsgReq::const_iterator mit = msgIds.begin();
for(; mit != msgIds.end(); ++mit)
{
MsgUpdates updates;
const std::vector<RsGxsMessageId>& msgIdV = mit->second;
const RsGxsGroupId& grpId = mit->first;
GxsMsgReq reqIds;
reqIds.insert(std::make_pair(grpId, std::vector<RsGxsMessageId>() ));
// can get offsets for each file
std::vector<MsgOffset> msgOffsets;
locked_getMessageOffsets(grpId, msgOffsets);
std::string oldFileName = mServiceDir + "/" + grpId.toStdString() + "-msgs";
std::string newFileName = mServiceDir + "/" + grpId.toStdString() + "-msgs-temp";
std::ifstream in(oldFileName.c_str(), std::ios::binary);
std::vector<char> dataBuff, newBuffer;
std::vector<MsgOffset>::iterator vit = msgOffsets.begin();
uint32_t maxSize = 0;// größe aller msgs, newbuf könnte aber kleiner sein, weil msgs weggehen
for(; vit != msgOffsets.end(); ++vit)
maxSize += vit->msgLen;
// may be preferable to determine file len reality
// from file? corrupt db?
dataBuff.reserve(maxSize);// dataBuff.resize(maxSize);
newBuffer.reserve(maxSize);// newBuffer.resize(maxSize);
dataBuff.insert(dataBuff.end(),
std::istreambuf_iterator<char>(in),
std::istreambuf_iterator<char>());
in.close();
uint32_t newOffset = 0;// am anfang der liste ist offset=0, jetzt gehen wir die msgs liste durch
for(std::vector<MsgOffset>::size_type i = 0; i < msgOffsets.size(); ++i)
{
const MsgOffset& m = msgOffsets[i];
//uint32_t newOffset = 0;//hier ist es zu spät, offset muss hochgezählt werden
if(std::find(msgIdV.begin(), msgIdV.end(), m.msgId) == msgIdV.end())
{
MsgUpdate up;
uint32_t msgLen = m.msgLen;
up.msgId = m.msgId;
up.cv.put(KEY_NXS_FILE_OFFSET, (int32_t)newOffset);
newBuffer.insert(newBuffer.end(), dataBuff.begin()+m.msgOffset,
dataBuff.begin()+m.msgOffset+m.msgLen);
newOffset += msgLen;
up.cv.put(KEY_NXS_FILE_LEN, (int32_t)msgLen);
// add msg update
updates[grpId].push_back(up);
}
}
std::ofstream out(newFileName.c_str(), std::ios::binary);
std::copy(newBuffer.begin(), newBuffer.end(),
std::ostreambuf_iterator<char>(out));
out.close();
// now update the new positions in db
locked_updateMessageEntries(updates);
// then delete removed messages
// delete messages
GxsMsgReq msgsToDelete;
msgsToDelete[grpId] = msgIdV;
locked_removeMessageEntries(msgsToDelete);
// now replace old file location with new file
remove(oldFileName.c_str());
RsDirUtil::renameFile(newFileName, oldFileName);
}
return 1;
@ -1508,17 +1550,6 @@ int RsDataService::removeGroups(const std::vector<RsGxsGroupId> &grpIds)
RsStackMutex stack(mDbMutex);
// the grp id is the group file name
// first remove file then remove group
// from db
std::vector<RsGxsGroupId>::const_iterator vit = grpIds.begin();
for(; vit != grpIds.end(); ++vit)
{
const std::string grpFileName = mServiceDir + "/" + (*vit).toStdString();
remove(grpFileName.c_str());
}
locked_removeGroupEntries(grpIds);
return 1;
@ -1605,33 +1636,6 @@ int RsDataService::retrieveMsgIds(const RsGxsGroupId& grpId, RsGxsMessageId::std
}
bool RsDataService::locked_updateMessageEntries(const MsgUpdates& updates)
{
// start a transaction
bool ret = mDb->beginTransaction();
MsgUpdates::const_iterator mit = updates.begin();
for(; mit != updates.end(); ++mit)
{
const RsGxsGroupId& grpId = mit->first;
const std::vector<MsgUpdate>& updateV = mit->second;
std::vector<MsgUpdate>::const_iterator vit = updateV.begin();
for(; vit != updateV.end(); ++vit)
{
const MsgUpdate& update = *vit;
mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString()
+ "' AND " + KEY_MSG_ID + "='" + update.msgId.toStdString() + "'", update.cv);
}
}
ret &= mDb->commitTransaction();
return ret;
}
bool RsDataService::locked_removeMessageEntries(const GxsMsgReq& msgIds)
{
// start a transaction
@ -1676,38 +1680,6 @@ bool RsDataService::locked_removeGroupEntries(const std::vector<RsGxsGroupId>& g
return ret;
}
void RsDataService::locked_getMessageOffsets(const RsGxsGroupId& grpId, std::vector<MsgOffset>& offsets)
{
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgOffSetColumns, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
if(c)
{
bool valid = c->moveToFirst();
while(valid)
{
RsGxsMessageId msgId;
int32_t msgLen;
int32_t msgOffSet;
std::string temp;
c->getString(0, temp);
msgId = RsGxsMessageId(temp);
msgOffSet = c->getInt32(1);
msgLen = c->getInt32(2);
MsgOffset offset;
offset.msgId = msgId;
offset.msgLen = msgLen;
offset.msgOffset = msgOffSet;
offsets.push_back(offset);
valid = c->moveToNext();
}
delete c;
}
}
uint32_t RsDataService::cacheSize() const {
return 0;
}

View File

@ -29,15 +29,6 @@
#include "gxs/rsgds.h"
#include "util/retrodb.h"
class MsgOffset
{
public:
MsgOffset() : msgOffset(0), msgLen(0) {}
RsGxsMessageId msgId;
uint32_t msgOffset, msgLen;
};
class MsgUpdate
{
public:
@ -233,8 +224,9 @@ private:
/*!
* Creates an sql database and its associated file
* also creates the message and groups table
* @param isNewDatabase is new database
*/
void initialise();
void initialise(bool isNewDatabase);
/*!
* Remove entries for data base
@ -243,19 +235,21 @@ private:
bool locked_removeMessageEntries(const GxsMsgReq& msgIds);
bool locked_removeGroupEntries(const std::vector<RsGxsGroupId>& grpIds);
typedef std::map<RsGxsGroupId, std::vector<MsgUpdate> > MsgUpdates;
private:
/*!
* Start release update
* @param release
* @return true/false
*/
bool startReleaseUpdate(int release);
/*!
* Update messages entries with new values
* @param msgIds
* @param cv contains values to update message entries with
* Finish release update
* @param release
* @param result
* @return true/false
*/
bool locked_updateMessageEntries(const MsgUpdates& updates);
private:
void locked_getMessageOffsets(const RsGxsGroupId& grpId, std::vector<MsgOffset>& msgOffsets);
bool finishReleaseUpdate(int release, bool result);
private:
@ -263,7 +257,6 @@ private:
std::list<std::string> msgColumns;
std::list<std::string> msgMetaColumns;
std::list<std::string> mMsgOffSetColumns;
std::list<std::string> mMsgIdColumn;
std::list<std::string> grpColumns;