Merge pull request #2308 from matrix-org/erikj/user_ip_repl

Make workers report to master for user ip updates
This commit is contained in:
Erik Johnston 2017-06-27 15:36:47 +01:00 committed by GitHub
commit 4b444723f0
9 changed files with 111 additions and 8 deletions

View File

@ -24,6 +24,7 @@ from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
@ -33,7 +34,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.rest.client.v1.room import PublicRoomListRestServlet
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
@ -65,8 +65,8 @@ class ClientReaderSlavedStore(
SlavedApplicationServiceStore, SlavedApplicationServiceStore,
SlavedRegistrationStore, SlavedRegistrationStore,
TransactionStore, TransactionStore,
SlavedClientIpStore,
BaseSlavedStore, BaseSlavedStore,
ClientIpStore, # After BaseSlavedStore because the constructor is different
): ):
pass pass

View File

@ -23,13 +23,13 @@ from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore from synapse.storage.media_repository import MediaRepositoryStore
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
@ -60,10 +60,10 @@ logger = logging.getLogger("synapse.app.media_repository")
class MediaRepositorySlavedStore( class MediaRepositorySlavedStore(
SlavedApplicationServiceStore, SlavedApplicationServiceStore,
SlavedRegistrationStore, SlavedRegistrationStore,
SlavedClientIpStore,
TransactionStore, TransactionStore,
BaseSlavedStore, BaseSlavedStore,
MediaRepositoryStore, MediaRepositoryStore,
ClientIpStore,
): ):
pass pass

View File

@ -29,6 +29,7 @@ from synapse.rest.client.v1 import events
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@ -42,7 +43,6 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore from synapse.storage.roommember import RoomMemberStore
@ -77,9 +77,9 @@ class SynchrotronSlavedStore(
SlavedPresenceStore, SlavedPresenceStore,
SlavedDeviceInboxStore, SlavedDeviceInboxStore,
SlavedDeviceStore, SlavedDeviceStore,
SlavedClientIpStore,
RoomStore, RoomStore,
BaseSlavedStore, BaseSlavedStore,
ClientIpStore, # After BaseSlavedStore because the constructor is different
): ):
who_forgot_in_room = ( who_forgot_in_room = (
RoomMemberStore.__dict__["who_forgot_in_room"] RoomMemberStore.__dict__["who_forgot_in_room"]

View File

@ -26,12 +26,12 @@ from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v2_alpha import user_directory from synapse.rest.client.v2_alpha import user_directory
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.user_directory import UserDirectoryStore from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
@ -58,9 +58,9 @@ class UserDirectorySlaveStore(
SlavedEventStore, SlavedEventStore,
SlavedApplicationServiceStore, SlavedApplicationServiceStore,
SlavedRegistrationStore, SlavedRegistrationStore,
SlavedClientIpStore,
UserDirectoryStore, UserDirectoryStore,
BaseSlavedStore, BaseSlavedStore,
ClientIpStore, # After BaseSlavedStore because the constructor is different
): ):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(db_conn, hs) super(UserDirectorySlaveStore, self).__init__(db_conn, hs)

View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import BaseSlavedStore
from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import Cache
class SlavedClientIpStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedClientIpStore, self).__init__(db_conn, hs)
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
max_entries=50000 * CACHE_SIZE_FACTOR,
)
def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec())
user_id = user.to_string()
key = (user_id, access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)

View File

@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
from .commands import ( from .commands import (
FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand, FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
UserIpCommand,
) )
from .protocol import ClientReplicationStreamProtocol from .protocol import ClientReplicationStreamProtocol
@ -178,6 +179,12 @@ class ReplicationClientHandler(object):
cmd = InvalidateCacheCommand(cache_func.__name__, keys) cmd = InvalidateCacheCommand(cache_func.__name__, keys)
self.send_command(cmd) self.send_command(cmd)
def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
"""Tell the master that the user made a request.
"""
cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
self.send_command(cmd)
def await_sync(self, data): def await_sync(self, data):
"""Returns a deferred that is resolved when we receive a SYNC command """Returns a deferred that is resolved when we receive a SYNC command
with given data. with given data.

View File

@ -304,6 +304,36 @@ class InvalidateCacheCommand(Command):
return " ".join((self.cache_func, json.dumps(self.keys))) return " ".join((self.cache_func, json.dumps(self.keys)))
class UserIpCommand(Command):
"""Sent periodically when a worker sees activity from a client.
Format::
USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
"""
NAME = "USER_IP"
def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
self.user_id = user_id
self.access_token = access_token
self.ip = ip
self.user_agent = user_agent
self.device_id = device_id
self.last_seen = last_seen
@classmethod
def from_line(cls, line):
user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5)
return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen))
def to_line(self):
return " ".join((
self.user_id, self.access_token, self.ip, self.device_id,
str(self.last_seen), self.user_agent,
))
# Map of command name to command type. # Map of command name to command type.
COMMAND_MAP = { COMMAND_MAP = {
cmd.NAME: cmd cmd.NAME: cmd
@ -320,6 +350,7 @@ COMMAND_MAP = {
SyncCommand, SyncCommand,
RemovePusherCommand, RemovePusherCommand,
InvalidateCacheCommand, InvalidateCacheCommand,
UserIpCommand,
) )
} }
@ -342,5 +373,6 @@ VALID_CLIENT_COMMANDS = (
FederationAckCommand.NAME, FederationAckCommand.NAME,
RemovePusherCommand.NAME, RemovePusherCommand.NAME,
InvalidateCacheCommand.NAME, InvalidateCacheCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME, ErrorCommand.NAME,
) )

View File

@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
def on_INVALIDATE_CACHE(self, cmd): def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
def on_USER_IP(self, cmd):
self.streamer.on_user_ip(
cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
cmd.last_seen,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token): def subscribe_to_stream(self, stream_name, token):
"""Subscribe the remote to a streams. """Subscribe the remote to a streams.

View File

@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync")
federation_ack_counter = metrics.register_counter("federation_ack") federation_ack_counter = metrics.register_counter("federation_ack")
remove_pusher_counter = metrics.register_counter("remove_pusher") remove_pusher_counter = metrics.register_counter("remove_pusher")
invalidate_cache_counter = metrics.register_counter("invalidate_cache") invalidate_cache_counter = metrics.register_counter("invalidate_cache")
user_ip_cache_counter = metrics.register_counter("user_ip_cache")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -238,6 +239,15 @@ class ReplicationStreamer(object):
invalidate_cache_counter.inc() invalidate_cache_counter.inc()
getattr(self.store, cache_func).invalidate(tuple(keys)) getattr(self.store, cache_func).invalidate(tuple(keys))
@measure_func("repl.on_user_ip")
def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
"""The client saw a user request
"""
user_ip_cache_counter.inc()
self.store.insert_client_ip(
user_id, access_token, ip, user_agent, device_id, last_seen,
)
def send_sync_to_all_connections(self, data): def send_sync_to_all_connections(self, data):
"""Sends a SYNC command to all clients. """Sends a SYNC command to all clients.