mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-02 21:04:50 -04:00
Add appservice worker
This commit is contained in:
parent
9da84a9a1e
commit
07229bbdae
7 changed files with 364 additions and 118 deletions
|
@ -218,38 +218,37 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
Returns:
|
||||
AppServiceTransaction: A new transaction.
|
||||
"""
|
||||
def _create_appservice_txn(txn):
|
||||
# work out new txn id (highest txn id for this service += 1)
|
||||
# The highest id may be the last one sent (in which case it is last_txn)
|
||||
# or it may be the highest in the txns list (which are waiting to be/are
|
||||
# being sent)
|
||||
last_txn_id = self._get_last_txn(txn, service.id)
|
||||
|
||||
txn.execute(
|
||||
"SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
|
||||
(service.id,)
|
||||
)
|
||||
highest_txn_id = txn.fetchone()[0]
|
||||
if highest_txn_id is None:
|
||||
highest_txn_id = 0
|
||||
|
||||
new_txn_id = max(highest_txn_id, last_txn_id) + 1
|
||||
|
||||
# Insert new txn into txn table
|
||||
event_ids = json.dumps([e.event_id for e in events])
|
||||
txn.execute(
|
||||
"INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
|
||||
"VALUES(?,?,?)",
|
||||
(service.id, new_txn_id, event_ids)
|
||||
)
|
||||
return AppServiceTransaction(
|
||||
service=service, id=new_txn_id, events=events
|
||||
)
|
||||
|
||||
return self.runInteraction(
|
||||
"create_appservice_txn",
|
||||
self._create_appservice_txn,
|
||||
service, events
|
||||
)
|
||||
|
||||
def _create_appservice_txn(self, txn, service, events):
|
||||
# work out new txn id (highest txn id for this service += 1)
|
||||
# The highest id may be the last one sent (in which case it is last_txn)
|
||||
# or it may be the highest in the txns list (which are waiting to be/are
|
||||
# being sent)
|
||||
last_txn_id = self._get_last_txn(txn, service.id)
|
||||
|
||||
txn.execute(
|
||||
"SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
|
||||
(service.id,)
|
||||
)
|
||||
highest_txn_id = txn.fetchone()[0]
|
||||
if highest_txn_id is None:
|
||||
highest_txn_id = 0
|
||||
|
||||
new_txn_id = max(highest_txn_id, last_txn_id) + 1
|
||||
|
||||
# Insert new txn into txn table
|
||||
event_ids = json.dumps([e.event_id for e in events])
|
||||
txn.execute(
|
||||
"INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
|
||||
"VALUES(?,?,?)",
|
||||
(service.id, new_txn_id, event_ids)
|
||||
)
|
||||
return AppServiceTransaction(
|
||||
service=service, id=new_txn_id, events=events
|
||||
_create_appservice_txn,
|
||||
)
|
||||
|
||||
def complete_appservice_txn(self, txn_id, service):
|
||||
|
@ -263,39 +262,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
A Deferred which resolves if this transaction was stored
|
||||
successfully.
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"complete_appservice_txn",
|
||||
self._complete_appservice_txn,
|
||||
txn_id, service
|
||||
)
|
||||
|
||||
def _complete_appservice_txn(self, txn, txn_id, service):
|
||||
txn_id = int(txn_id)
|
||||
|
||||
# Debugging query: Make sure the txn being completed is EXACTLY +1 from
|
||||
# what was there before. If it isn't, we've got problems (e.g. the AS
|
||||
# has probably missed some events), so whine loudly but still continue,
|
||||
# since it shouldn't fail completion of the transaction.
|
||||
last_txn_id = self._get_last_txn(txn, service.id)
|
||||
if (last_txn_id + 1) != txn_id:
|
||||
logger.error(
|
||||
"appservice: Completing a transaction which has an ID > 1 from "
|
||||
"the last ID sent to this AS. We've either dropped events or "
|
||||
"sent it to the AS out of order. FIX ME. last_txn=%s "
|
||||
"completing_txn=%s service_id=%s", last_txn_id, txn_id,
|
||||
service.id
|
||||
def _complete_appservice_txn(txn):
|
||||
# Debugging query: Make sure the txn being completed is EXACTLY +1 from
|
||||
# what was there before. If it isn't, we've got problems (e.g. the AS
|
||||
# has probably missed some events), so whine loudly but still continue,
|
||||
# since it shouldn't fail completion of the transaction.
|
||||
last_txn_id = self._get_last_txn(txn, service.id)
|
||||
if (last_txn_id + 1) != txn_id:
|
||||
logger.error(
|
||||
"appservice: Completing a transaction which has an ID > 1 from "
|
||||
"the last ID sent to this AS. We've either dropped events or "
|
||||
"sent it to the AS out of order. FIX ME. last_txn=%s "
|
||||
"completing_txn=%s service_id=%s", last_txn_id, txn_id,
|
||||
service.id
|
||||
)
|
||||
|
||||
# Set current txn_id for AS to 'txn_id'
|
||||
self._simple_upsert_txn(
|
||||
txn, "application_services_state", dict(as_id=service.id),
|
||||
dict(last_txn=txn_id)
|
||||
)
|
||||
|
||||
# Set current txn_id for AS to 'txn_id'
|
||||
self._simple_upsert_txn(
|
||||
txn, "application_services_state", dict(as_id=service.id),
|
||||
dict(last_txn=txn_id)
|
||||
)
|
||||
# Delete txn
|
||||
self._simple_delete_txn(
|
||||
txn, "application_services_txns",
|
||||
dict(txn_id=txn_id, as_id=service.id)
|
||||
)
|
||||
|
||||
# Delete txn
|
||||
self._simple_delete_txn(
|
||||
txn, "application_services_txns",
|
||||
dict(txn_id=txn_id, as_id=service.id)
|
||||
return self.runInteraction(
|
||||
"complete_appservice_txn",
|
||||
_complete_appservice_txn,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -309,10 +307,25 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
A Deferred which resolves to an AppServiceTransaction or
|
||||
None.
|
||||
"""
|
||||
def _get_oldest_unsent_txn(txn):
|
||||
# Monotonically increasing txn ids, so just select the smallest
|
||||
# one in the txns table (we delete them when they are sent)
|
||||
txn.execute(
|
||||
"SELECT * FROM application_services_txns WHERE as_id=?"
|
||||
" ORDER BY txn_id ASC LIMIT 1",
|
||||
(service.id,)
|
||||
)
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if not rows:
|
||||
return None
|
||||
|
||||
entry = rows[0]
|
||||
|
||||
return entry
|
||||
|
||||
entry = yield self.runInteraction(
|
||||
"get_oldest_unsent_appservice_txn",
|
||||
self._get_oldest_unsent_txn,
|
||||
service
|
||||
_get_oldest_unsent_txn,
|
||||
)
|
||||
|
||||
if not entry:
|
||||
|
@ -326,22 +339,6 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
service=service, id=entry["txn_id"], events=events
|
||||
))
|
||||
|
||||
def _get_oldest_unsent_txn(self, txn, service):
|
||||
# Monotonically increasing txn ids, so just select the smallest
|
||||
# one in the txns table (we delete them when they are sent)
|
||||
txn.execute(
|
||||
"SELECT * FROM application_services_txns WHERE as_id=?"
|
||||
" ORDER BY txn_id ASC LIMIT 1",
|
||||
(service.id,)
|
||||
)
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if not rows:
|
||||
return None
|
||||
|
||||
entry = rows[0]
|
||||
|
||||
return entry
|
||||
|
||||
def _get_last_txn(self, txn, service_id):
|
||||
txn.execute(
|
||||
"SELECT last_txn FROM application_services_state WHERE as_id=?",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue