fix: [code] MISP Galaxy skips duplicate values

This commit is contained in:
Christophe Vandeplas 2023-12-21 09:16:38 +01:00
parent 197dcd3aea
commit 352e5411ec
No known key found for this signature in database
GPG Key ID: BDC48619FFDC5A5B

View File

@ -25,9 +25,6 @@ class DisarmGalaxy:
self.disarm = Disarm()
self.out_path = out_path
self.all_data = {} # variable that will contain everything
self.all_data_uuid = {} # used to compute references
self.galaxy_types = ['techniques', 'countermeasures', 'detections', 'actortypes']
def generate_all_galaxies(self):
@ -35,12 +32,8 @@ class DisarmGalaxy:
getattr(self, f'generate_{galaxy_type}_galaxy')() # also saves the files
def generate_all_clusters(self):
# first build up the data
for galaxy_type in self.galaxy_types:
getattr(self, f'generate_{galaxy_type}_clusters')()
# write all to files
for galaxy_type in self.galaxy_types:
self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), self.all_data[galaxy_type])
def write_json_file(self, fname, file_data):
with open(fname, 'w') as f:
@ -77,8 +70,13 @@ class DisarmGalaxy:
'values': [],
'version': 1}
values = []
seen_values = []
df = self.disarm.df_techniques
for i in range(len(df)):
if df.values[i][1] in seen_values: # remove duplicates
continue
seen_values.append(df.values[i][1])
entry_id = df.values[i][0]
kill_chain = [f'tactics:{self.disarm.tactics[df.values[i][3]]}']
related = []
@ -89,9 +87,6 @@ class DisarmGalaxy:
related_id = row['disarm_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
# "tags": [
# "estimative-language:likelihood-probability=\"almost-certain\""
# ],
"type": "blocked-by" # mitigated-by would be cleaner, but does not exist as relationship type
})
# Detections relations
@ -101,9 +96,6 @@ class DisarmGalaxy:
related_id = row['disarm_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
# "tags": [
# "estimative-language:likelihood-probability=\"almost-certain\""
# ],
"type": "detected-by"
})
@ -121,10 +113,9 @@ class DisarmGalaxy:
'related': related
}
values.append(value)
self.all_data_uuid[value['uuid']] = value
cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
self.all_data[galaxy_type] = cluster
self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
def generate_countermeasures_galaxy(self):
galaxy_type = 'countermeasures'
@ -162,8 +153,13 @@ class DisarmGalaxy:
'values': [],
'version': 1}
values = []
seen_values = []
df = self.disarm.df_counters
for i in range(len(df)):
if df.values[i][1] in seen_values: # remove duplicates
continue
seen_values.append(df.values[i][1])
entry_id = df.values[i][0]
kill_chain = []
if self.disarm.tactics[df.values[i][15]]:
@ -181,9 +177,6 @@ class DisarmGalaxy:
related_id = row['technique_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
# "tags": [
# "estimative-language:likelihood-probability=\"almost-certain\""
# ],
"type": "blocks" # mitigated would be cleaner, but mitigated-by does not exist as relationship type
})
# Actortype relations
@ -193,9 +186,6 @@ class DisarmGalaxy:
related_id = row['actortype_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
# "tags": [
# "estimative-language:likelihood-probability=\"almost-certain\""
# ],
"type": "affected-by"
# mitigated-by would be cleaner, but mitigated-by does not exist as relationship type
})
@ -214,10 +204,9 @@ class DisarmGalaxy:
'related': related
}
values.append(value)
self.all_data_uuid[value['uuid']] = value
cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
self.all_data[galaxy_type] = cluster
self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
def generate_detections_galaxy(self):
galaxy_type = 'detections'
@ -255,8 +244,13 @@ class DisarmGalaxy:
'values': [],
'version': 1}
values = []
seen_values = []
df = self.disarm.df_detections
for i in range(len(df)):
if df.values[i][1] in seen_values: # remove duplicates
continue
seen_values.append(df.values[i][1])
entry_id = df.values[i][0]
kill_chain = []
try:
@ -281,9 +275,6 @@ class DisarmGalaxy:
related_id = row['technique_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
# "tags": [
# "estimative-language:likelihood-probability=\"almost-certain\""
# ],
"type": "detects"
})
# Actortypes relations
@ -293,9 +284,6 @@ class DisarmGalaxy:
related_id = row['actortype_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
# "tags": [
# "estimative-language:likelihood-probability=\"almost-certain\""
# ],
"type": "detected-by"
# mitigated-by would be cleaner, but mitigated-by does not exist as relationship type
})
@ -314,10 +302,9 @@ class DisarmGalaxy:
'related': related
}
values.append(value)
self.all_data_uuid[value['uuid']] = value
cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
self.all_data[galaxy_type] = cluster
self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
def generate_actortypes_galaxy(self):
galaxy_type = 'actortypes'
@ -349,8 +336,13 @@ class DisarmGalaxy:
'values': [],
'version': 1}
values = []
seen_values = []
df = self.disarm.df_actortypes
for i in range(len(df)):
if df.values[i][1] in seen_values: # remove duplicates
continue
seen_values.append(df.values[i][1])
entry_id = df.values[i][0]
kill_chain = []
try:
@ -370,9 +362,6 @@ class DisarmGalaxy:
related_id = row['disarm_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
# "tags": [
# "estimative-language:likelihood-probability=\"almost-certain\""
# ],
"type": "affects"
})
# Detections relations
@ -382,9 +371,6 @@ class DisarmGalaxy:
related_id = row['disarm_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
# "tags": [
# "estimative-language:likelihood-probability=\"almost-certain\""
# ],
"type": "detects"
})
@ -403,9 +389,8 @@ class DisarmGalaxy:
}
values.append(value)
self.all_data_uuid[value['uuid']] = value
cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
self.all_data[galaxy_type] = cluster
self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
def main():