DISARMframeworks/CODE/generate_DISARM_MISP_galaxy.py

406 lines
17 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Generate DISARM MISP galaxy
see also https://github.com/MISP/misp-galaxy
Author: Christophe Vandeplas
License: AGPL-3
"""
from generate_DISARM_pages import Disarm
import json
import uuid
import os
DISARM_DESCRIPTION = 'DISARM is a framework designed for describing and understanding disinformation incidents.'
DISARM_CATEGORY = 'disarm'
DISARM_AUTHORS = ['DISARM Project']
DISARM_SOURCE = 'https://github.com/DISARMFoundation/DISARMframeworks'
CORE_UUID = "9d6bd9d2-2cd3-4900-b61a-06cd64df3996"
class DisarmGalaxy:
def __init__(self, out_path=os.path.join('..', '..', 'misp-galaxy')):
self.disarm = Disarm()
self.out_path = out_path
self.galaxy_types = ['techniques', 'countermeasures', 'detections', 'actortypes']
def generate_all_galaxies(self):
for galaxy_type in self.galaxy_types:
getattr(self, f'generate_{galaxy_type}_galaxy')() # also saves the files
def generate_all_clusters(self):
for galaxy_type in self.galaxy_types:
getattr(self, f'generate_{galaxy_type}_clusters')()
def write_json_file(self, fname, file_data):
with open(fname, 'w') as f:
json.dump(file_data, f, indent=2, sort_keys=True, ensure_ascii=False)
f.write('\n')
def generate_techniques_galaxy(self):
galaxy_type = 'techniques'
galaxy = {'name': 'Techniques',
'type': f'disarm-{galaxy_type}',
'description': DISARM_DESCRIPTION,
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-galaxy-{galaxy_type}')),
'version': 1,
'icon': 'map',
'namespace': 'disarm',
'kill_chain_order': {
'tactics': []
}}
for k, v in self.disarm.tactics.items():
galaxy['kill_chain_order']['tactics'].append(f'{v}')
self.write_json_file(os.path.join(self.out_path, 'galaxies', f'disarm-{galaxy_type}.json'), galaxy)
def generate_techniques_clusters(self):
galaxy_type = 'techniques'
cluster = {'authors': DISARM_AUTHORS,
'category': DISARM_CATEGORY,
'description': DISARM_DESCRIPTION,
'name': 'Techniques',
'source': DISARM_SOURCE,
'type': f'disarm-{galaxy_type}',
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-cluster-{galaxy_type}')),
'values': [],
'version': 1}
values = []
seen_values = []
df = self.disarm.df_techniques
for i in range(len(df)):
if df.values[i][1] in seen_values: # remove duplicates
continue
seen_values.append(df.values[i][1])
entry_id = df.values[i][0]
kill_chain = [f'tactics:{self.disarm.tactics[df.values[i][3]]}']
related = []
# Countermeasures relations
mapping = self.disarm.cross_counterid_techniqueid[
self.disarm.cross_counterid_techniqueid['technique_id'] == entry_id]
for index, row in mapping.sort_values('disarm_id').iterrows():
related_id = row['disarm_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
"type": "blocked-by" # mitigated-by would be cleaner, but does not exist as relationship type
})
# Detections relations
mapping = self.disarm.cross_detectionid_techniqueid[
self.disarm.cross_detectionid_techniqueid['technique_id'] == entry_id]
for index, row in mapping.sort_values('disarm_id').iterrows():
related_id = row['disarm_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
"type": "detected-by"
})
value = {
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), entry_id)),
'value': df.values[i][1],
'description': df.values[i][4],
'meta': {
'external_id': entry_id,
'kill_chain': kill_chain,
'refs': [
f'https://github.com/DISARMFoundation/DISARMframeworks/blob/main/generated_pages/{galaxy_type}/{entry_id}.md'
]
},
'related': related
}
values.append(value)
cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
def generate_countermeasures_galaxy(self):
galaxy_type = 'countermeasures'
galaxy = {'name': 'Countermeasures',
'type': f'disarm-{galaxy_type}',
'description': DISARM_DESCRIPTION,
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-galaxy-{galaxy_type}')),
'version': 1,
'icon': 'shield-alt',
'namespace': 'disarm',
'kill_chain_order': {
'tactics': [],
'responsetypes': [],
'metatechniques': []
}}
for k, v in self.disarm.tactics.items():
galaxy['kill_chain_order']['tactics'].append(f'{v}')
for k, v in self.disarm.responsetypes.items():
galaxy['kill_chain_order']['responsetypes'].append(f'{v}')
for k, v in self.disarm.metatechniques.items():
galaxy['kill_chain_order']['metatechniques'].append(f'{v}')
self.write_json_file(os.path.join(self.out_path, 'galaxies', f'disarm-{galaxy_type}.json'), galaxy)
def generate_countermeasures_clusters(self):
galaxy_type = 'countermeasures'
cluster = {'authors': DISARM_AUTHORS,
'category': DISARM_CATEGORY,
'description': DISARM_DESCRIPTION,
'name': 'Countermeasures',
'source': DISARM_SOURCE,
'type': f'disarm-{galaxy_type}',
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-cluster-{galaxy_type}')),
'values': [],
'version': 1}
values = []
seen_values = []
df = self.disarm.df_counters
for i in range(len(df)):
if df.values[i][1] in seen_values: # remove duplicates
continue
seen_values.append(df.values[i][1])
entry_id = df.values[i][0]
kill_chain = []
if self.disarm.tactics[df.values[i][15]]:
kill_chain.append(f'tactics:{self.disarm.tactics[df.values[i][15]]}')
if self.disarm.responsetypes[df.values[i][10]]:
kill_chain.append(f'responsetypes:{self.disarm.responsetypes[df.values[i][10]]}')
if self.disarm.metatechniques[df.values[i][17]]:
kill_chain.append(f'metatechniques:{self.disarm.metatechniques[df.values[i][17]]}')
related = []
# Techniques relations
mapping = self.disarm.cross_counterid_techniqueid[
self.disarm.cross_counterid_techniqueid['disarm_id'] == entry_id]
for index, row in mapping.sort_values('technique_id').iterrows():
related_id = row['technique_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
"type": "blocks" # mitigated would be cleaner, but mitigated-by does not exist as relationship type
})
# Actortype relations
mapping = self.disarm.cross_counterid_actortypeid[
self.disarm.cross_counterid_actortypeid['disarm_id'] == entry_id]
for index, row in mapping.sort_values('actortype_id').iterrows():
related_id = row['actortype_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
"type": "affected-by"
# mitigated-by would be cleaner, but mitigated-by does not exist as relationship type
})
value = {
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), entry_id)),
'value': df.values[i][1],
'description': df.values[i][3],
'meta': {
'external_id': entry_id,
'kill_chain': kill_chain,
'refs': [
f'https://github.com/DISARMFoundation/DISARMframeworks/blob/main/generated_pages/counters/{entry_id}.md'
]
},
'related': related
}
values.append(value)
cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
def generate_detections_galaxy(self):
galaxy_type = 'detections'
galaxy = {'name': 'Detections',
'type': f'disarm-{galaxy_type}',
'description': DISARM_DESCRIPTION,
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-galaxy-{galaxy_type}')),
'version': 1,
'icon': 'bell',
'namespace': 'disarm',
'kill_chain_order': {
'tactics': [],
'responsetypes': [],
# 'metatechniques': []
}}
for k, v in self.disarm.tactics.items():
galaxy['kill_chain_order']['tactics'].append(f'{v}')
for k, v in self.disarm.responsetypes.items():
galaxy['kill_chain_order']['responsetypes'].append(f'{v}')
# for k, v in self.disarm.metatechniques.items():
# galaxy['kill_chain_order']['metatechniques'].append(f'{v}')
self.write_json_file(os.path.join(self.out_path, 'galaxies', f'disarm-{galaxy_type}.json'), galaxy)
def generate_detections_clusters(self):
galaxy_type = 'detections'
cluster = {'authors': DISARM_AUTHORS,
'category': DISARM_CATEGORY,
'description': DISARM_DESCRIPTION,
'name': 'Detections',
'source': DISARM_SOURCE,
'type': f'disarm-{galaxy_type}',
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-cluster-{galaxy_type}')),
'values': [],
'version': 1}
values = []
seen_values = []
df = self.disarm.df_detections
for i in range(len(df)):
if df.values[i][1] in seen_values: # remove duplicates
continue
seen_values.append(df.values[i][1])
entry_id = df.values[i][0]
kill_chain = []
try:
if self.disarm.tactics[df.values[i][14]]:
kill_chain.append(f'tactics:{self.disarm.tactics[df.values[i][14]]}')
except KeyError:
pass
try:
if self.disarm.responsetypes[df.values[i][10]]:
kill_chain.append(f'responsetypes:{self.disarm.responsetypes[df.values[i][10]]}')
except KeyError:
pass
# Metatechnique ID is not in the array
# if self.disarm.metatechniques[df.values[i][???]]:
# kill_chain.append(f'metatechniques:{self.disarm.metatechniques[df.values[i][???]]}')
related = []
# Techniques relations
mapping = self.disarm.cross_detectionid_techniqueid[
self.disarm.cross_detectionid_techniqueid['disarm_id'] == entry_id]
for index, row in mapping.sort_values('technique_id').iterrows():
related_id = row['technique_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
"type": "detects"
})
# Actortypes relations
mapping = self.disarm.cross_detectionid_actortypeid[
self.disarm.cross_detectionid_actortypeid['disarm_id'] == entry_id]
for index, row in mapping.sort_values('actortype_id').iterrows():
related_id = row['actortype_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
"type": "detected-by"
# mitigated-by would be cleaner, but mitigated-by does not exist as relationship type
})
value = {
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), entry_id)),
'value': df.values[i][1],
'description': df.values[i][3],
'meta': {
'external_id': entry_id,
'kill_chain': kill_chain,
'refs': [
f'https://github.com/DISARMFoundation/DISARMframeworks/blob/main/generated_pages/{galaxy_type}/{entry_id}.md'
]
},
'related': related
}
values.append(value)
cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
def generate_actortypes_galaxy(self):
galaxy_type = 'actortypes'
galaxy = {'name': 'Actor Types',
'type': f'disarm-{galaxy_type}',
'description': DISARM_DESCRIPTION,
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-galaxy-{galaxy_type}')),
'version': 1,
'icon': 'user-secret',
'namespace': 'disarm',
'kill_chain_order': {
'sectors': []
}}
for k, v in self.disarm.sectors.items():
galaxy['kill_chain_order']['sectors'].append(f'{v}')
self.write_json_file(os.path.join(self.out_path, 'galaxies', f'disarm-{galaxy_type}.json'), galaxy)
def generate_actortypes_clusters(self):
galaxy_type = 'actortypes'
cluster = {'authors': DISARM_AUTHORS,
'category': DISARM_CATEGORY,
'description': DISARM_DESCRIPTION,
'name': 'Actor Types',
'source': DISARM_SOURCE,
'type': f'disarm-{galaxy_type}',
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-cluster-{galaxy_type}')),
'values': [],
'version': 1}
values = []
seen_values = []
df = self.disarm.df_actortypes
for i in range(len(df)):
if df.values[i][1] in seen_values: # remove duplicates
continue
seen_values.append(df.values[i][1])
entry_id = df.values[i][0]
kill_chain = []
try:
sectors = df.values[i][3].split(',')
for sector in sectors:
sector = sector.strip()
if self.disarm.sectors[sector]:
kill_chain.append(f'sectors:{self.disarm.sectors[sector]}')
except KeyError:
pass
related = []
# Countermeasures relations
mapping = self.disarm.cross_counterid_actortypeid[
self.disarm.cross_counterid_actortypeid['actortype_id'] == entry_id]
for index, row in mapping.sort_values('disarm_id').iterrows():
related_id = row['disarm_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
"type": "affects"
})
# Detections relations
mapping = self.disarm.cross_detectionid_actortypeid[
self.disarm.cross_detectionid_actortypeid['actortype_id'] == entry_id]
for index, row in mapping.sort_values('disarm_id').iterrows():
related_id = row['disarm_id']
related.append({
"dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
"type": "detects"
})
value = {
'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), entry_id)),
'value': df.values[i][1],
'description': df.values[i][2],
'meta': {
'external_id': entry_id,
'kill_chain': kill_chain,
'refs': [
f'https://github.com/DISARMFoundation/DISARMframeworks/blob/main/generated_pages/{galaxy_type}/{entry_id}.md'
]
},
'related': related
}
values.append(value)
cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
def main():
disarm_galaxy = DisarmGalaxy()
disarm_galaxy.generate_all_galaxies()
disarm_galaxy.generate_all_clusters()
if __name__ == "__main__":
main()
print("All done, please look at the delta, and update the version number if needed.")
print("After that do ./jq_all_the_things.sh, commit, and then ./validate_all.sh.")