mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
455ef04187
simple_update_many_txn had a bug in it which would cause each update to be applied twice.
807 lines
28 KiB
Python
807 lines
28 KiB
Python
# Copyright 2014-2016 OpenMarket Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from collections import OrderedDict
|
|
from typing import Generator
|
|
from unittest.mock import Mock, call, patch
|
|
|
|
from twisted.internet import defer
|
|
|
|
from synapse.storage._base import SQLBaseStore
|
|
from synapse.storage.database import DatabasePool
|
|
from synapse.storage.engines import create_engine
|
|
|
|
from tests import unittest
|
|
from tests.server import TestHomeServer
|
|
from tests.utils import USE_POSTGRES_FOR_TESTS, default_config
|
|
|
|
|
|
class SQLBaseStoreTestCase(unittest.TestCase):
|
|
"""Test the "simple" SQL generating methods in SQLBaseStore."""
|
|
|
|
def setUp(self) -> None:
|
|
# This is the Twisted connection pool.
|
|
conn_pool = Mock(spec=["runInteraction", "runWithConnection"])
|
|
self.mock_txn = Mock()
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
# To avoid testing psycopg2 itself, patch execute_batch/execute_values
|
|
# to assert how it is called.
|
|
from psycopg2 import extras
|
|
|
|
self.mock_execute_batch = Mock()
|
|
self.execute_batch_patcher = patch.object(
|
|
extras, "execute_batch", new=self.mock_execute_batch
|
|
)
|
|
self.execute_batch_patcher.start()
|
|
self.mock_execute_values = Mock()
|
|
self.execute_values_patcher = patch.object(
|
|
extras, "execute_values", new=self.mock_execute_values
|
|
)
|
|
self.execute_values_patcher.start()
|
|
|
|
self.mock_conn = Mock(
|
|
spec_set=[
|
|
"cursor",
|
|
"rollback",
|
|
"commit",
|
|
"closed",
|
|
"reconnect",
|
|
"set_session",
|
|
"encoding",
|
|
]
|
|
)
|
|
self.mock_conn.encoding = "UNICODE"
|
|
else:
|
|
self.mock_conn = Mock(spec_set=["cursor", "rollback", "commit"])
|
|
self.mock_conn.cursor.return_value = self.mock_txn
|
|
self.mock_txn.connection = self.mock_conn
|
|
self.mock_conn.rollback.return_value = None
|
|
# Our fake runInteraction just runs synchronously inline
|
|
|
|
def runInteraction(func, *args, **kwargs) -> defer.Deferred: # type: ignore[no-untyped-def]
|
|
return defer.succeed(func(self.mock_txn, *args, **kwargs))
|
|
|
|
conn_pool.runInteraction = runInteraction
|
|
|
|
def runWithConnection(func, *args, **kwargs): # type: ignore[no-untyped-def]
|
|
return defer.succeed(func(self.mock_conn, *args, **kwargs))
|
|
|
|
conn_pool.runWithConnection = runWithConnection
|
|
|
|
config = default_config(name="test", parse=True)
|
|
hs = TestHomeServer("test", config=config)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
db_config = {"name": "psycopg2", "args": {}}
|
|
else:
|
|
db_config = {"name": "sqlite3"}
|
|
engine = create_engine(db_config)
|
|
|
|
fake_engine = Mock(wraps=engine)
|
|
fake_engine.in_transaction.return_value = False
|
|
fake_engine.module.OperationalError = engine.module.OperationalError
|
|
fake_engine.module.DatabaseError = engine.module.DatabaseError
|
|
fake_engine.module.IntegrityError = engine.module.IntegrityError
|
|
# Don't convert param style to make assertions easier.
|
|
fake_engine.convert_param_style = lambda sql: sql
|
|
# To fix isinstance(...) checks.
|
|
fake_engine.__class__ = engine.__class__ # type: ignore[assignment]
|
|
|
|
db = DatabasePool(Mock(), Mock(config=db_config), fake_engine)
|
|
db._db_pool = conn_pool
|
|
|
|
self.datastore = SQLBaseStore(db, None, hs) # type: ignore[arg-type]
|
|
|
|
def tearDown(self) -> None:
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.execute_batch_patcher.stop()
|
|
self.execute_values_patcher.stop()
|
|
|
|
@defer.inlineCallbacks
|
|
def test_insert_1col(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_insert(
|
|
table="tablename", values={"columname": "Value"}
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"INSERT INTO tablename (columname) VALUES(?)", ("Value",)
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_insert_3cols(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_insert(
|
|
table="tablename",
|
|
# Use OrderedDict() so we can assert on the SQL generated
|
|
values=OrderedDict([("colA", 1), ("colB", 2), ("colC", 3)]),
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"INSERT INTO tablename (colA, colB, colC) VALUES(?, ?, ?)", (1, 2, 3)
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_insert_many(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_insert_many(
|
|
table="tablename",
|
|
keys=(
|
|
"col1",
|
|
"col2",
|
|
),
|
|
values=[
|
|
(
|
|
"val1",
|
|
"val2",
|
|
),
|
|
("val3", "val4"),
|
|
],
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_execute_values.assert_called_once_with(
|
|
self.mock_txn,
|
|
"INSERT INTO tablename (col1, col2) VALUES ?",
|
|
[("val1", "val2"), ("val3", "val4")],
|
|
template=None,
|
|
fetch=False,
|
|
)
|
|
else:
|
|
self.mock_txn.executemany.assert_called_once_with(
|
|
"INSERT INTO tablename (col1, col2) VALUES(?, ?)",
|
|
[("val1", "val2"), ("val3", "val4")],
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_insert_many_no_iterable(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_insert_many(
|
|
table="tablename",
|
|
keys=(
|
|
"col1",
|
|
"col2",
|
|
),
|
|
values=[],
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_execute_values.assert_not_called()
|
|
else:
|
|
self.mock_txn.executemany.assert_not_called()
|
|
|
|
@defer.inlineCallbacks
|
|
def test_select_one_1col(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
self.mock_txn.__iter__ = Mock(return_value=iter([("Value",)]))
|
|
|
|
value = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_select_one_onecol(
|
|
table="tablename", keyvalues={"keycol": "TheKey"}, retcol="retcol"
|
|
)
|
|
)
|
|
|
|
self.assertEqual("Value", value)
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"SELECT retcol FROM tablename WHERE keycol = ?", ["TheKey"]
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_select_one_3col(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
self.mock_txn.fetchone.return_value = (1, 2, 3)
|
|
|
|
ret = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_select_one(
|
|
table="tablename",
|
|
keyvalues={"keycol": "TheKey"},
|
|
retcols=["colA", "colB", "colC"],
|
|
)
|
|
)
|
|
|
|
self.assertEqual({"colA": 1, "colB": 2, "colC": 3}, ret)
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"SELECT colA, colB, colC FROM tablename WHERE keycol = ?", ["TheKey"]
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_select_one_missing(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 0
|
|
self.mock_txn.fetchone.return_value = None
|
|
|
|
ret = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_select_one(
|
|
table="tablename",
|
|
keyvalues={"keycol": "Not here"},
|
|
retcols=["colA"],
|
|
allow_none=True,
|
|
)
|
|
)
|
|
|
|
self.assertFalse(ret)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_select_list(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 3
|
|
self.mock_txn.fetchall.return_value = [(1,), (2,), (3,)]
|
|
self.mock_txn.description = (("colA", None, None, None, None, None, None),)
|
|
|
|
ret = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_select_list(
|
|
table="tablename", keyvalues={"keycol": "A set"}, retcols=["colA"]
|
|
)
|
|
)
|
|
|
|
self.assertEqual([(1,), (2,), (3,)], ret)
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"SELECT colA FROM tablename WHERE keycol = ?", ["A set"]
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_select_many_batch(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 3
|
|
self.mock_txn.fetchall.side_effect = [[(1,), (2,)], [(3,)]]
|
|
|
|
ret = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_select_many_batch(
|
|
table="tablename",
|
|
column="col1",
|
|
iterable=("val1", "val2", "val3"),
|
|
retcols=("col2",),
|
|
keyvalues={"col3": "val4"},
|
|
batch_size=2,
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_has_calls(
|
|
[
|
|
call(
|
|
"SELECT col2 FROM tablename WHERE col1 = ANY(?) AND col3 = ?",
|
|
[["val1", "val2"], "val4"],
|
|
),
|
|
call(
|
|
"SELECT col2 FROM tablename WHERE col1 = ANY(?) AND col3 = ?",
|
|
[["val3"], "val4"],
|
|
),
|
|
],
|
|
)
|
|
self.assertEqual([(1,), (2,), (3,)], ret)
|
|
|
|
def test_select_many_no_iterable(self) -> None:
|
|
self.mock_txn.rowcount = 3
|
|
self.mock_txn.fetchall.side_effect = [(1,), (2,)]
|
|
|
|
ret = self.datastore.db_pool.simple_select_many_txn(
|
|
self.mock_txn,
|
|
table="tablename",
|
|
column="col1",
|
|
iterable=(),
|
|
retcols=("col2",),
|
|
keyvalues={"col3": "val4"},
|
|
)
|
|
|
|
self.mock_txn.execute.assert_not_called()
|
|
self.assertEqual([], ret)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_update_one_1col(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_update_one(
|
|
table="tablename",
|
|
keyvalues={"keycol": "TheKey"},
|
|
updatevalues={"columnname": "New Value"},
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"UPDATE tablename SET columnname = ? WHERE keycol = ?",
|
|
["New Value", "TheKey"],
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_update_one_4cols(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_update_one(
|
|
table="tablename",
|
|
keyvalues=OrderedDict([("colA", 1), ("colB", 2)]),
|
|
updatevalues=OrderedDict([("colC", 3), ("colD", 4)]),
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"UPDATE tablename SET colC = ?, colD = ? WHERE" " colA = ? AND colB = ?",
|
|
[3, 4, 1, 2],
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_update_many(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_update_many(
|
|
table="tablename",
|
|
key_names=("col1", "col2"),
|
|
key_values=[("val1", "val2")],
|
|
value_names=("col3",),
|
|
value_values=[("val3",)],
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_execute_batch.assert_called_once_with(
|
|
self.mock_txn,
|
|
"UPDATE tablename SET col3 = ? WHERE col1 = ? AND col2 = ?",
|
|
[("val3", "val1", "val2")],
|
|
)
|
|
else:
|
|
self.mock_txn.executemany.assert_called_once_with(
|
|
"UPDATE tablename SET col3 = ? WHERE col1 = ? AND col2 = ?",
|
|
[("val3", "val1", "val2")],
|
|
)
|
|
|
|
# key_values and value_values must be the same length.
|
|
with self.assertRaises(ValueError):
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_update_many(
|
|
table="tablename",
|
|
key_names=("col1", "col2"),
|
|
key_values=[("val1", "val2")],
|
|
value_names=("col3",),
|
|
value_values=[],
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_update_many_no_iterable(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_update_many(
|
|
table="tablename",
|
|
key_names=("col1", "col2"),
|
|
key_values=[],
|
|
value_names=("col3",),
|
|
value_values=[],
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_execute_batch.assert_not_called()
|
|
else:
|
|
self.mock_txn.executemany.assert_not_called()
|
|
|
|
@defer.inlineCallbacks
|
|
def test_delete_one(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_delete_one(
|
|
table="tablename", keyvalues={"keycol": "Go away"}
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"DELETE FROM tablename WHERE keycol = ?", ["Go away"]
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_delete_many(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 2
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_delete_many(
|
|
table="tablename",
|
|
column="col1",
|
|
iterable=("val1", "val2"),
|
|
keyvalues={"col2": "val3"},
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"DELETE FROM tablename WHERE col1 = ANY(?) AND col2 = ?",
|
|
[["val1", "val2"], "val3"],
|
|
)
|
|
self.assertEqual(result, 2)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_delete_many_no_iterable(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_delete_many(
|
|
table="tablename",
|
|
column="col1",
|
|
iterable=(),
|
|
keyvalues={"col2": "val3"},
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_not_called()
|
|
self.assertEqual(result, 0)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_delete_many_no_keyvalues(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 2
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_delete_many(
|
|
table="tablename",
|
|
column="col1",
|
|
iterable=("val1", "val2"),
|
|
keyvalues={},
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"DELETE FROM tablename WHERE col1 = ANY(?)", [["val1", "val2"]]
|
|
)
|
|
self.assertEqual(result, 2)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "oldvalue"},
|
|
values={"othercol": "newvalue"},
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"INSERT INTO tablename (columnname, othercol) VALUES (?, ?) ON CONFLICT (columnname) DO UPDATE SET othercol=EXCLUDED.othercol",
|
|
["oldvalue", "newvalue"],
|
|
)
|
|
self.assertTrue(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_no_values(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "value"},
|
|
values={},
|
|
insertion_values={"columnname": "value"},
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"INSERT INTO tablename (columnname) VALUES (?) ON CONFLICT (columnname) DO NOTHING",
|
|
["value"],
|
|
)
|
|
self.assertTrue(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_with_insertion(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "oldvalue"},
|
|
values={"othercol": "newvalue"},
|
|
insertion_values={"thirdcol": "insertionval"},
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"INSERT INTO tablename (columnname, thirdcol, othercol) VALUES (?, ?, ?) ON CONFLICT (columnname) DO UPDATE SET othercol=EXCLUDED.othercol",
|
|
["oldvalue", "insertionval", "newvalue"],
|
|
)
|
|
self.assertTrue(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_with_where(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.mock_txn.rowcount = 1
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "oldvalue"},
|
|
values={"othercol": "newvalue"},
|
|
where_clause="thirdcol IS NULL",
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"INSERT INTO tablename (columnname, othercol) VALUES (?, ?) ON CONFLICT (columnname) WHERE thirdcol IS NULL DO UPDATE SET othercol=EXCLUDED.othercol",
|
|
["oldvalue", "newvalue"],
|
|
)
|
|
self.assertTrue(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_many(self) -> Generator["defer.Deferred[object]", object, None]:
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert_many(
|
|
table="tablename",
|
|
key_names=["keycol1", "keycol2"],
|
|
key_values=[["keyval1", "keyval2"], ["keyval3", "keyval4"]],
|
|
value_names=["valuecol3"],
|
|
value_values=[["val5"], ["val6"]],
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_execute_values.assert_called_once_with(
|
|
self.mock_txn,
|
|
"INSERT INTO tablename (keycol1, keycol2, valuecol3) VALUES ? ON CONFLICT (keycol1, keycol2) DO UPDATE SET valuecol3=EXCLUDED.valuecol3",
|
|
[("keyval1", "keyval2", "val5"), ("keyval3", "keyval4", "val6")],
|
|
template=None,
|
|
fetch=False,
|
|
)
|
|
else:
|
|
self.mock_txn.executemany.assert_called_once_with(
|
|
"INSERT INTO tablename (keycol1, keycol2, valuecol3) VALUES (?, ?, ?) ON CONFLICT (keycol1, keycol2) DO UPDATE SET valuecol3=EXCLUDED.valuecol3",
|
|
[("keyval1", "keyval2", "val5"), ("keyval3", "keyval4", "val6")],
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_many_no_values(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert_many(
|
|
table="tablename",
|
|
key_names=["columnname"],
|
|
key_values=[["oldvalue"]],
|
|
value_names=[],
|
|
value_values=[],
|
|
desc="",
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_execute_values.assert_called_once_with(
|
|
self.mock_txn,
|
|
"INSERT INTO tablename (columnname) VALUES ? ON CONFLICT (columnname) DO NOTHING",
|
|
[("oldvalue",)],
|
|
template=None,
|
|
fetch=False,
|
|
)
|
|
else:
|
|
self.mock_txn.executemany.assert_called_once_with(
|
|
"INSERT INTO tablename (columnname) VALUES (?) ON CONFLICT (columnname) DO NOTHING",
|
|
[("oldvalue",)],
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_emulated_no_values_exists(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
|
|
|
|
self.mock_txn.fetchall.return_value = [(1,)]
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "value"},
|
|
values={},
|
|
insertion_values={"columnname": "value"},
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_txn.execute.assert_has_calls(
|
|
[
|
|
call("LOCK TABLE tablename in EXCLUSIVE MODE", ()),
|
|
call("SELECT 1 FROM tablename WHERE columnname = ?", ["value"]),
|
|
]
|
|
)
|
|
else:
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"SELECT 1 FROM tablename WHERE columnname = ?", ["value"]
|
|
)
|
|
self.assertFalse(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_emulated_no_values_not_exists(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
|
|
|
|
self.mock_txn.fetchall.return_value = []
|
|
self.mock_txn.rowcount = 1
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "value"},
|
|
values={},
|
|
insertion_values={"columnname": "value"},
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_has_calls(
|
|
[
|
|
call(
|
|
"SELECT 1 FROM tablename WHERE columnname = ?",
|
|
["value"],
|
|
),
|
|
call("INSERT INTO tablename (columnname) VALUES (?)", ["value"]),
|
|
],
|
|
)
|
|
self.assertTrue(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_emulated_with_insertion_exists(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
|
|
|
|
self.mock_txn.rowcount = 1
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "oldvalue"},
|
|
values={"othercol": "newvalue"},
|
|
insertion_values={"thirdcol": "insertionval"},
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_txn.execute.assert_has_calls(
|
|
[
|
|
call("LOCK TABLE tablename in EXCLUSIVE MODE", ()),
|
|
call(
|
|
"UPDATE tablename SET othercol = ? WHERE columnname = ?",
|
|
["newvalue", "oldvalue"],
|
|
),
|
|
]
|
|
)
|
|
else:
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"UPDATE tablename SET othercol = ? WHERE columnname = ?",
|
|
["newvalue", "oldvalue"],
|
|
)
|
|
self.assertTrue(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_emulated_with_insertion_not_exists(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
|
|
|
|
self.mock_txn.rowcount = 0
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "oldvalue"},
|
|
values={"othercol": "newvalue"},
|
|
insertion_values={"thirdcol": "insertionval"},
|
|
)
|
|
)
|
|
|
|
self.mock_txn.execute.assert_has_calls(
|
|
[
|
|
call(
|
|
"UPDATE tablename SET othercol = ? WHERE columnname = ?",
|
|
["newvalue", "oldvalue"],
|
|
),
|
|
call(
|
|
"INSERT INTO tablename (columnname, othercol, thirdcol) VALUES (?, ?, ?)",
|
|
["oldvalue", "newvalue", "insertionval"],
|
|
),
|
|
]
|
|
)
|
|
self.assertTrue(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_emulated_with_where(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
|
|
|
|
self.mock_txn.rowcount = 1
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "oldvalue"},
|
|
values={"othercol": "newvalue"},
|
|
where_clause="thirdcol IS NULL",
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_txn.execute.assert_has_calls(
|
|
[
|
|
call("LOCK TABLE tablename in EXCLUSIVE MODE", ()),
|
|
call(
|
|
"UPDATE tablename SET othercol = ? WHERE columnname = ? AND thirdcol IS NULL",
|
|
["newvalue", "oldvalue"],
|
|
),
|
|
]
|
|
)
|
|
else:
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"UPDATE tablename SET othercol = ? WHERE columnname = ? AND thirdcol IS NULL",
|
|
["newvalue", "oldvalue"],
|
|
)
|
|
self.assertTrue(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_upsert_emulated_with_where_no_values(
|
|
self,
|
|
) -> Generator["defer.Deferred[object]", object, None]:
|
|
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
|
|
|
|
self.mock_txn.rowcount = 1
|
|
|
|
result = yield defer.ensureDeferred(
|
|
self.datastore.db_pool.simple_upsert(
|
|
table="tablename",
|
|
keyvalues={"columnname": "oldvalue"},
|
|
values={},
|
|
where_clause="thirdcol IS NULL",
|
|
)
|
|
)
|
|
|
|
if USE_POSTGRES_FOR_TESTS:
|
|
self.mock_txn.execute.assert_has_calls(
|
|
[
|
|
call("LOCK TABLE tablename in EXCLUSIVE MODE", ()),
|
|
call(
|
|
"SELECT 1 FROM tablename WHERE columnname = ? AND thirdcol IS NULL",
|
|
["oldvalue"],
|
|
),
|
|
]
|
|
)
|
|
else:
|
|
self.mock_txn.execute.assert_called_once_with(
|
|
"SELECT 1 FROM tablename WHERE columnname = ? AND thirdcol IS NULL",
|
|
["oldvalue"],
|
|
)
|
|
self.assertFalse(result)
|