Add parse_row method to replication stream class

This will allow individual stream classes to override how a row is parsed.
This commit is contained in:
Richard van der Hoff 2019-03-27 07:40:32 +00:00
parent 91c3513668
commit f570916a3e
3 changed files with 19 additions and 3 deletions

View File

@ -105,13 +105,14 @@ class ReplicationClientHandler(object):
def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token.
By default this just pokes the slave store. Can be overriden in subclasses to
By default this just pokes the slave store. Can be overridden in subclasses to
handle more.
Args:
stream_name (str): name of the replication stream for this batch of rows
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects.
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns:
Deferred|None

View File

@ -605,7 +605,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
inbound_rdata_count.labels(stream_name).inc()
try:
row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
row = STREAMS_MAP[stream_name].parse_row(cmd.row)
except Exception:
logger.exception(
"[%s] Failed to parse RDATA: %r %r",

View File

@ -115,6 +115,21 @@ class Stream(object):
ROW_TYPE = None # The type of the row
_LIMITED = True # Whether the update function takes a limit
@classmethod
def parse_row(cls, row):
"""Parse a row received over replication
By default, assumes that the row data is an array object and passes its contents
to the constructor of the ROW_TYPE for this stream.
Args:
row: row data from the incoming RDATA command, after json decoding
Returns:
ROW_TYPE object for this stream
"""
return cls.ROW_TYPE(*row)
def __init__(self, hs):
# The token from which we last asked for updates
self.last_token = self.current_token()