Consistently use db_to_json to convert from database values to JSON objects. (#7849)

This commit is contained in:
Patrick Cloke 2020-07-16 11:32:19 -04:00 committed by GitHub
parent b0f031f92a
commit f460da6031
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 80 additions and 82 deletions

1
changelog.d/7849.misc Normal file
View File

@ -0,0 +1 @@
Consistently use `db_to_json` to convert from database values to JSON objects.

View File

@ -100,8 +100,8 @@ def db_to_json(db_content):
if isinstance(db_content, memoryview): if isinstance(db_content, memoryview):
db_content = db_content.tobytes() db_content = db_content.tobytes()
# Decode it to a Unicode string before feeding it to json.loads, so we # Decode it to a Unicode string before feeding it to json.loads, since
# consistenty get a Unicode-containing object out. # Python 3.5 does not support deserializing bytes.
if isinstance(db_content, (bytes, bytearray)): if isinstance(db_content, (bytes, bytearray)):
db_content = db_content.decode("utf8") db_content = db_content.decode("utf8")

View File

@ -249,7 +249,10 @@ class BackgroundUpdater(object):
retcol="progress_json", retcol="progress_json",
) )
progress = json.loads(progress_json) # Avoid a circular import.
from synapse.storage._base import db_to_json
progress = db_to_json(progress_json)
time_start = self._clock.time_msec() time_start = self._clock.time_msec()
items_updated = await update_handler(progress, batch_size) items_updated = await update_handler(progress, batch_size)

View File

@ -22,7 +22,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@ -77,7 +77,7 @@ class AccountDataWorkerStore(SQLBaseStore):
) )
global_account_data = { global_account_data = {
row["account_data_type"]: json.loads(row["content"]) for row in rows row["account_data_type"]: db_to_json(row["content"]) for row in rows
} }
rows = self.db.simple_select_list_txn( rows = self.db.simple_select_list_txn(
@ -90,7 +90,7 @@ class AccountDataWorkerStore(SQLBaseStore):
by_room = {} by_room = {}
for row in rows: for row in rows:
room_data = by_room.setdefault(row["room_id"], {}) room_data = by_room.setdefault(row["room_id"], {})
room_data[row["account_data_type"]] = json.loads(row["content"]) room_data[row["account_data_type"]] = db_to_json(row["content"])
return global_account_data, by_room return global_account_data, by_room
@ -113,7 +113,7 @@ class AccountDataWorkerStore(SQLBaseStore):
) )
if result: if result:
return json.loads(result) return db_to_json(result)
else: else:
return None return None
@ -137,7 +137,7 @@ class AccountDataWorkerStore(SQLBaseStore):
) )
return { return {
row["account_data_type"]: json.loads(row["content"]) for row in rows row["account_data_type"]: db_to_json(row["content"]) for row in rows
} }
return self.db.runInteraction( return self.db.runInteraction(
@ -170,7 +170,7 @@ class AccountDataWorkerStore(SQLBaseStore):
allow_none=True, allow_none=True,
) )
return json.loads(content_json) if content_json else None return db_to_json(content_json) if content_json else None
return self.db.runInteraction( return self.db.runInteraction(
"get_account_data_for_room_and_type", get_account_data_for_room_and_type_txn "get_account_data_for_room_and_type", get_account_data_for_room_and_type_txn
@ -255,7 +255,7 @@ class AccountDataWorkerStore(SQLBaseStore):
txn.execute(sql, (user_id, stream_id)) txn.execute(sql, (user_id, stream_id))
global_account_data = {row[0]: json.loads(row[1]) for row in txn} global_account_data = {row[0]: db_to_json(row[1]) for row in txn}
sql = ( sql = (
"SELECT room_id, account_data_type, content FROM room_account_data" "SELECT room_id, account_data_type, content FROM room_account_data"
@ -267,7 +267,7 @@ class AccountDataWorkerStore(SQLBaseStore):
account_data_by_room = {} account_data_by_room = {}
for row in txn: for row in txn:
room_account_data = account_data_by_room.setdefault(row[0], {}) room_account_data = account_data_by_room.setdefault(row[0], {})
room_account_data[row[1]] = json.loads(row[2]) room_account_data[row[1]] = db_to_json(row[2])
return global_account_data, account_data_by_room return global_account_data, account_data_by_room

View File

@ -22,7 +22,7 @@ from twisted.internet import defer
from synapse.appservice import AppServiceTransaction from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices from synapse.config.appservice import load_appservices
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.database import Database from synapse.storage.database import Database
@ -303,7 +303,7 @@ class ApplicationServiceTransactionWorkerStore(
if not entry: if not entry:
return None return None
event_ids = json.loads(entry["event_ids"]) event_ids = db_to_json(entry["event_ids"])
events = yield self.get_events_as_list(event_ids) events = yield self.get_events_as_list(event_ids)

View File

@ -21,7 +21,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
@ -65,7 +65,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
messages = [] messages = []
for row in txn: for row in txn:
stream_pos = row[0] stream_pos = row[0]
messages.append(json.loads(row[1])) messages.append(db_to_json(row[1]))
if len(messages) < limit: if len(messages) < limit:
stream_pos = current_stream_id stream_pos = current_stream_id
return messages, stream_pos return messages, stream_pos
@ -173,7 +173,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
messages = [] messages = []
for row in txn: for row in txn:
stream_pos = row[0] stream_pos = row[0]
messages.append(json.loads(row[1])) messages.append(db_to_json(row[1]))
if len(messages) < limit: if len(messages) < limit:
log_kv({"message": "Set stream position to current position"}) log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id stream_pos = current_stream_id

View File

@ -577,7 +577,7 @@ class DeviceWorkerStore(SQLBaseStore):
rows = yield self.db.execute( rows = yield self.db.execute(
"get_users_whose_signatures_changed", None, sql, user_id, from_key "get_users_whose_signatures_changed", None, sql, user_id, from_key
) )
return {user for row in rows for user in json.loads(row[0])} return {user for row in rows for user in db_to_json(row[0])}
else: else:
return set() return set()

View File

@ -14,13 +14,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.logging.opentracing import log_kv, trace from synapse.logging.opentracing import log_kv, trace
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore, db_to_json
class EndToEndRoomKeyStore(SQLBaseStore): class EndToEndRoomKeyStore(SQLBaseStore):
@ -148,7 +148,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"forwarded_count": row["forwarded_count"], "forwarded_count": row["forwarded_count"],
# is_verified must be returned to the client as a boolean # is_verified must be returned to the client as a boolean
"is_verified": bool(row["is_verified"]), "is_verified": bool(row["is_verified"]),
"session_data": json.loads(row["session_data"]), "session_data": db_to_json(row["session_data"]),
} }
return sessions return sessions
@ -222,7 +222,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"first_message_index": row[2], "first_message_index": row[2],
"forwarded_count": row[3], "forwarded_count": row[3],
"is_verified": row[4], "is_verified": row[4],
"session_data": json.loads(row[5]), "session_data": db_to_json(row[5]),
} }
return ret return ret
@ -319,7 +319,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
keyvalues={"user_id": user_id, "version": this_version, "deleted": 0}, keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
retcols=("version", "algorithm", "auth_data", "etag"), retcols=("version", "algorithm", "auth_data", "etag"),
) )
result["auth_data"] = json.loads(result["auth_data"]) result["auth_data"] = db_to_json(result["auth_data"])
result["version"] = str(result["version"]) result["version"] = str(result["version"])
if result["etag"] is None: if result["etag"] is None:
result["etag"] = 0 result["etag"] = 0

View File

@ -366,7 +366,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for row in rows: for row in rows:
user_id = row["user_id"] user_id = row["user_id"]
key_type = row["keytype"] key_type = row["keytype"]
key = json.loads(row["keydata"]) key = db_to_json(row["keydata"])
user_info = result.setdefault(user_id, {}) user_info = result.setdefault(user_id, {})
user_info[key_type] = key user_info[key_type] = key

View File

@ -21,7 +21,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
@ -58,7 +58,7 @@ def _deserialize_action(actions, is_highlight):
"""Custom deserializer for actions. This allows us to "compress" common actions """Custom deserializer for actions. This allows us to "compress" common actions
""" """
if actions: if actions:
return json.loads(actions) return db_to_json(actions)
if is_highlight: if is_highlight:
return DEFAULT_HIGHLIGHT_ACTION return DEFAULT_HIGHLIGHT_ACTION

View File

@ -20,7 +20,6 @@ from collections import OrderedDict, namedtuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
import attr import attr
from canonicaljson import json
from prometheus_client import Counter from prometheus_client import Counter
from twisted.internet import defer from twisted.internet import defer
@ -32,7 +31,7 @@ from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase # noqa: F401 from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401
from synapse.logging.utils import log_function from synapse.logging.utils import log_function
from synapse.storage._base import make_in_list_sql_clause from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.data_stores.main.search import SearchEntry from synapse.storage.data_stores.main.search import SearchEntry
from synapse.storage.database import Database, LoggingTransaction from synapse.storage.database import Database, LoggingTransaction
from synapse.storage.util.id_generators import StreamIdGenerator from synapse.storage.util.id_generators import StreamIdGenerator
@ -236,7 +235,7 @@ class PersistEventsStore:
) )
txn.execute(sql + clause, args) txn.execute(sql + clause, args)
results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed")) results.extend(r[0] for r in txn if not db_to_json(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100): for chunk in batch_iter(event_ids, 100):
yield self.db.runInteraction( yield self.db.runInteraction(
@ -297,7 +296,7 @@ class PersistEventsStore:
if prev_event_id in existing_prevs: if prev_event_id in existing_prevs:
continue continue
soft_failed = json.loads(metadata).get("soft_failed") soft_failed = db_to_json(metadata).get("soft_failed")
if soft_failed or rejected: if soft_failed or rejected:
to_recursively_check.append(prev_event_id) to_recursively_check.append(prev_event_id)
existing_prevs.add(prev_event_id) existing_prevs.add(prev_event_id)
@ -583,7 +582,7 @@ class PersistEventsStore:
txn.execute(sql, (room_id, EventTypes.Create, "")) txn.execute(sql, (room_id, EventTypes.Create, ""))
row = txn.fetchone() row = txn.fetchone()
if row: if row:
event_json = json.loads(row[0]) event_json = db_to_json(row[0])
content = event_json.get("content", {}) content = event_json.get("content", {})
creator = content.get("creator") creator = content.get("creator")
room_version_id = content.get("room_version", RoomVersions.V1.identifier) room_version_id = content.get("room_version", RoomVersions.V1.identifier)

View File

@ -15,12 +15,10 @@
import logging import logging
from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventContentFields from synapse.api.constants import EventContentFields
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database from synapse.storage.database import Database
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -125,7 +123,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
for row in rows: for row in rows:
try: try:
event_id = row[1] event_id = row[1]
event_json = json.loads(row[2]) event_json = db_to_json(row[2])
sender = event_json["sender"] sender = event_json["sender"]
content = event_json["content"] content = event_json["content"]
@ -208,7 +206,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
for row in ev_rows: for row in ev_rows:
event_id = row["event_id"] event_id = row["event_id"]
event_json = json.loads(row["json"]) event_json = db_to_json(row["json"])
try: try:
origin_server_ts = event_json["origin_server_ts"] origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError): except (KeyError, AttributeError):
@ -317,7 +315,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
soft_failed = False soft_failed = False
if metadata: if metadata:
soft_failed = json.loads(metadata).get("soft_failed") soft_failed = db_to_json(metadata).get("soft_failed")
if soft_failed or rejected: if soft_failed or rejected:
soft_failed_events_to_lookup.add(event_id) soft_failed_events_to_lookup.add(event_id)
@ -358,7 +356,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
graph[event_id] = {prev_event_id} graph[event_id] = {prev_event_id}
soft_failed = json.loads(metadata).get("soft_failed") soft_failed = db_to_json(metadata).get("soft_failed")
if soft_failed or rejected: if soft_failed or rejected:
soft_failed_events_to_lookup.add(event_id) soft_failed_events_to_lookup.add(event_id)
else: else:
@ -543,7 +541,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
last_row_event_id = "" last_row_event_id = ""
for (event_id, event_json_raw) in results: for (event_id, event_json_raw) in results:
try: try:
event_json = json.loads(event_json_raw) event_json = db_to_json(event_json_raw)
self.db.simple_insert_many_txn( self.db.simple_insert_many_txn(
txn=txn, txn=txn,

View File

@ -21,7 +21,6 @@ import threading
from collections import namedtuple from collections import namedtuple
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from canonicaljson import json
from constantly import NamedConstant, Names from constantly import NamedConstant, Names
from twisted.internet import defer from twisted.internet import defer
@ -40,7 +39,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
@ -611,8 +610,8 @@ class EventsWorkerStore(SQLBaseStore):
if not allow_rejected and rejected_reason: if not allow_rejected and rejected_reason:
continue continue
d = json.loads(row["json"]) d = db_to_json(row["json"])
internal_metadata = json.loads(row["internal_metadata"]) internal_metadata = db_to_json(row["internal_metadata"])
format_version = row["format_version"] format_version = row["format_version"]
if format_version is None: if format_version is None:

View File

@ -21,7 +21,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore, db_to_json
# The category ID for the "default" category. We don't store as null in the # The category ID for the "default" category. We don't store as null in the
# database to avoid the fun of null != null # database to avoid the fun of null != null
@ -197,7 +197,7 @@ class GroupServerWorkerStore(SQLBaseStore):
categories = { categories = {
row[0]: { row[0]: {
"is_public": row[1], "is_public": row[1],
"profile": json.loads(row[2]), "profile": db_to_json(row[2]),
"order": row[3], "order": row[3],
} }
for row in txn for row in txn
@ -221,7 +221,7 @@ class GroupServerWorkerStore(SQLBaseStore):
return { return {
row["category_id"]: { row["category_id"]: {
"is_public": row["is_public"], "is_public": row["is_public"],
"profile": json.loads(row["profile"]), "profile": db_to_json(row["profile"]),
} }
for row in rows for row in rows
} }
@ -235,7 +235,7 @@ class GroupServerWorkerStore(SQLBaseStore):
desc="get_group_category", desc="get_group_category",
) )
category["profile"] = json.loads(category["profile"]) category["profile"] = db_to_json(category["profile"])
return category return category
@ -251,7 +251,7 @@ class GroupServerWorkerStore(SQLBaseStore):
return { return {
row["role_id"]: { row["role_id"]: {
"is_public": row["is_public"], "is_public": row["is_public"],
"profile": json.loads(row["profile"]), "profile": db_to_json(row["profile"]),
} }
for row in rows for row in rows
} }
@ -265,7 +265,7 @@ class GroupServerWorkerStore(SQLBaseStore):
desc="get_group_role", desc="get_group_role",
) )
role["profile"] = json.loads(role["profile"]) role["profile"] = db_to_json(role["profile"])
return role return role
@ -333,7 +333,7 @@ class GroupServerWorkerStore(SQLBaseStore):
roles = { roles = {
row[0]: { row[0]: {
"is_public": row[1], "is_public": row[1],
"profile": json.loads(row[2]), "profile": db_to_json(row[2]),
"order": row[3], "order": row[3],
} }
for row in txn for row in txn
@ -462,7 +462,7 @@ class GroupServerWorkerStore(SQLBaseStore):
now = int(self._clock.time_msec()) now = int(self._clock.time_msec())
if row and now < row["valid_until_ms"]: if row and now < row["valid_until_ms"]:
return json.loads(row["attestation_json"]) return db_to_json(row["attestation_json"])
return None return None
@ -489,7 +489,7 @@ class GroupServerWorkerStore(SQLBaseStore):
"group_id": row[0], "group_id": row[0],
"type": row[1], "type": row[1],
"membership": row[2], "membership": row[2],
"content": json.loads(row[3]), "content": db_to_json(row[3]),
} }
for row in txn for row in txn
] ]
@ -519,7 +519,7 @@ class GroupServerWorkerStore(SQLBaseStore):
"group_id": group_id, "group_id": group_id,
"membership": membership, "membership": membership,
"type": gtype, "type": gtype,
"content": json.loads(content_json), "content": db_to_json(content_json),
} }
for group_id, membership, gtype, content_json in txn for group_id, membership, gtype, content_json in txn
] ]
@ -567,7 +567,7 @@ class GroupServerWorkerStore(SQLBaseStore):
""" """
txn.execute(sql, (last_id, current_id, limit)) txn.execute(sql, (last_id, current_id, limit))
updates = [ updates = [
(stream_id, (group_id, user_id, gtype, json.loads(content_json))) (stream_id, (group_id, user_id, gtype, db_to_json(content_json)))
for stream_id, group_id, user_id, gtype, content_json in txn for stream_id, group_id, user_id, gtype, content_json in txn
] ]

View File

@ -24,7 +24,7 @@ from twisted.internet import defer
from synapse.push.baserules import list_with_base_rules from synapse.push.baserules import list_with_base_rules
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.data_stores.main.pusher import PusherWorkerStore from synapse.storage.data_stores.main.pusher import PusherWorkerStore
@ -43,8 +43,8 @@ def _load_rules(rawrules, enabled_map):
ruleslist = [] ruleslist = []
for rawrule in rawrules: for rawrule in rawrules:
rule = dict(rawrule) rule = dict(rawrule)
rule["conditions"] = json.loads(rawrule["conditions"]) rule["conditions"] = db_to_json(rawrule["conditions"])
rule["actions"] = json.loads(rawrule["actions"]) rule["actions"] = db_to_json(rawrule["actions"])
rule["default"] = False rule["default"] = False
ruleslist.append(rule) ruleslist.append(rule)

View File

@ -17,11 +17,11 @@
import logging import logging
from typing import Iterable, Iterator, List, Tuple from typing import Iterable, Iterator, List, Tuple
from canonicaljson import encode_canonical_json, json from canonicaljson import encode_canonical_json
from twisted.internet import defer from twisted.internet import defer
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -36,7 +36,7 @@ class PusherWorkerStore(SQLBaseStore):
for r in rows: for r in rows:
dataJson = r["data"] dataJson = r["data"]
try: try:
r["data"] = json.loads(dataJson) r["data"] = db_to_json(dataJson)
except Exception as e: except Exception as e:
logger.warning( logger.warning(
"Invalid JSON in data for pusher %d: %s, %s", "Invalid JSON in data for pusher %d: %s, %s",

View File

@ -22,7 +22,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util.async_helpers import ObservableDeferred from synapse.util.async_helpers import ObservableDeferred
@ -203,7 +203,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
for row in rows: for row in rows:
content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[ content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[
row["user_id"] row["user_id"]
] = json.loads(row["data"]) ] = db_to_json(row["data"])
return [{"type": "m.receipt", "room_id": room_id, "content": content}] return [{"type": "m.receipt", "room_id": room_id, "content": content}]
@ -260,7 +260,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
event_entry = room_event["content"].setdefault(row["event_id"], {}) event_entry = room_event["content"].setdefault(row["event_id"], {})
receipt_type = event_entry.setdefault(row["receipt_type"], {}) receipt_type = event_entry.setdefault(row["receipt_type"], {})
receipt_type[row["user_id"]] = json.loads(row["data"]) receipt_type[row["user_id"]] = db_to_json(row["data"])
results = { results = {
room_id: [results[room_id]] if room_id in results else [] room_id: [results[room_id]] if room_id in results else []
@ -329,7 +329,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
""" """
txn.execute(sql, (last_id, current_id, limit)) txn.execute(sql, (last_id, current_id, limit))
updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn] updates = [(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn]
limited = False limited = False
upper_bound = current_id upper_bound = current_id

View File

@ -28,7 +28,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.data_stores.main.search import SearchStore from synapse.storage.data_stores.main.search import SearchStore
from synapse.storage.database import Database, LoggingTransaction from synapse.storage.database import Database, LoggingTransaction
from synapse.types import ThirdPartyInstanceID from synapse.types import ThirdPartyInstanceID
@ -670,7 +670,7 @@ class RoomWorkerStore(SQLBaseStore):
next_token = None next_token = None
for stream_ordering, content_json in txn: for stream_ordering, content_json in txn:
next_token = stream_ordering next_token = stream_ordering
event_json = json.loads(content_json) event_json = db_to_json(content_json)
content = event_json["content"] content = event_json["content"]
content_url = content.get("url") content_url = content.get("url")
thumbnail_url = content.get("info", {}).get("thumbnail_url") thumbnail_url = content.get("info", {}).get("thumbnail_url")
@ -915,7 +915,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
if not row["json"]: if not row["json"]:
retention_policy = {} retention_policy = {}
else: else:
ev = json.loads(row["json"]) ev = db_to_json(row["json"])
retention_policy = ev["content"] retention_policy = ev["content"]
self.db.simple_insert_txn( self.db.simple_insert_txn(
@ -971,7 +971,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
updates = [] updates = []
for room_id, event_json in txn: for room_id, event_json in txn:
event_dict = json.loads(event_json) event_dict = db_to_json(event_json)
room_version_id = event_dict.get("content", {}).get( room_version_id = event_dict.get("content", {}).get(
"room_version", RoomVersions.V1.identifier "room_version", RoomVersions.V1.identifier
) )

View File

@ -17,8 +17,6 @@
import logging import logging
from typing import Iterable, List, Set from typing import Iterable, List, Set
from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
@ -27,6 +25,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import ( from synapse.storage._base import (
LoggingTransaction, LoggingTransaction,
SQLBaseStore, SQLBaseStore,
db_to_json,
make_in_list_sql_clause, make_in_list_sql_clause,
) )
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
@ -938,7 +937,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
event_id = row["event_id"] event_id = row["event_id"]
room_id = row["room_id"] room_id = row["room_id"]
try: try:
event_json = json.loads(row["json"]) event_json = db_to_json(row["json"])
content = event_json["content"] content = event_json["content"]
except Exception: except Exception:
continue continue

View File

@ -17,12 +17,10 @@ import logging
import re import re
from collections import namedtuple from collections import namedtuple
from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
@ -157,7 +155,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
stream_ordering = row["stream_ordering"] stream_ordering = row["stream_ordering"]
origin_server_ts = row["origin_server_ts"] origin_server_ts = row["origin_server_ts"]
try: try:
event_json = json.loads(row["json"]) event_json = db_to_json(row["json"])
content = event_json["content"] content = event_json["content"]
except Exception: except Exception:
continue continue

View File

@ -21,6 +21,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.storage._base import db_to_json
from synapse.storage.data_stores.main.account_data import AccountDataWorkerStore from synapse.storage.data_stores.main.account_data import AccountDataWorkerStore
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
@ -49,7 +50,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
tags_by_room = {} tags_by_room = {}
for row in rows: for row in rows:
room_tags = tags_by_room.setdefault(row["room_id"], {}) room_tags = tags_by_room.setdefault(row["room_id"], {})
room_tags[row["tag"]] = json.loads(row["content"]) room_tags[row["tag"]] = db_to_json(row["content"])
return tags_by_room return tags_by_room
return deferred return deferred
@ -180,7 +181,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
retcols=("tag", "content"), retcols=("tag", "content"),
desc="get_tags_for_room", desc="get_tags_for_room",
).addCallback( ).addCallback(
lambda rows: {row["tag"]: json.loads(row["content"]) for row in rows} lambda rows: {row["tag"]: db_to_json(row["content"]) for row in rows}
) )

View File

@ -12,13 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union
import attr import attr
from canonicaljson import json
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.types import JsonDict from synapse.types import JsonDict
from synapse.util import stringutils as stringutils from synapse.util import stringutils as stringutils
@ -118,7 +118,7 @@ class UIAuthWorkerStore(SQLBaseStore):
desc="get_ui_auth_session", desc="get_ui_auth_session",
) )
result["clientdict"] = json.loads(result["clientdict"]) result["clientdict"] = db_to_json(result["clientdict"])
return UIAuthSessionData(session_id, **result) return UIAuthSessionData(session_id, **result)
@ -168,7 +168,7 @@ class UIAuthWorkerStore(SQLBaseStore):
retcols=("stage_type", "result"), retcols=("stage_type", "result"),
desc="get_completed_ui_auth_stages", desc="get_completed_ui_auth_stages",
): ):
results[row["stage_type"]] = json.loads(row["result"]) results[row["stage_type"]] = db_to_json(row["result"])
return results return results
@ -224,7 +224,7 @@ class UIAuthWorkerStore(SQLBaseStore):
) )
# Update it and add it back to the database. # Update it and add it back to the database.
serverdict = json.loads(result["serverdict"]) serverdict = db_to_json(result["serverdict"])
serverdict[key] = value serverdict[key] = value
self.db.simple_update_one_txn( self.db.simple_update_one_txn(
@ -254,7 +254,7 @@ class UIAuthWorkerStore(SQLBaseStore):
desc="get_ui_auth_session_data", desc="get_ui_auth_session_data",
) )
serverdict = json.loads(result["serverdict"]) serverdict = db_to_json(result["serverdict"])
return serverdict.get(key, default) return serverdict.get(key, default)