TC2-BBS-mesh/command_handlers.py

953 lines
41 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import configparser
import logging
import random
import time
import os
from meshtastic import BROADCAST_NUM
from db_operations import (
add_bulletin, add_mail, delete_mail,
get_bulletin_content, get_bulletins,
get_mail, get_mail_content,
add_channel, get_channels, get_sender_id_by_mail_id
)
from utils import (
get_node_id_from_num, get_node_info,
get_node_short_name, send_message,
update_user_state
)
# Read the configuration for menu options
config = configparser.ConfigParser()
config.read('config.ini')
main_menu_items = config['menu']['main_menu_items'].split(',')
bbs_menu_items = config['menu']['bbs_menu_items'].split(',')
utilities_menu_items = config['menu']['utilities_menu_items'].split(',')
def build_menu(items, menu_name):
menu_str = f"{menu_name}\n"
for item in items:
if item.strip() == 'Q':
menu_str += "[Q]uick Commands\n"
elif item.strip() == 'B':
menu_str += "[B]BS\n"
elif item.strip() == 'U':
menu_str += "[U]tilities\n"
elif item.strip() == 'X':
menu_str += "E[X]IT\n"
elif item.strip() == 'M':
menu_str += "[M]ail\n"
elif item.strip() == 'C':
menu_str += "[C]hannel Dir\n"
elif item.strip() == 'J':
menu_str += "[J]S8CALL\n"
elif item.strip() == 'G':
menu_str += "[G]ames\n"
elif item.strip() == 'S':
menu_str += "[S]tats\n"
elif item.strip() == 'F':
menu_str += "[F]ortune\n"
elif item.strip() == 'W':
menu_str += "[W]all of Shame\n"
return menu_str
def handle_help_command(sender_id, interface, menu_name=None):
if menu_name:
update_user_state(sender_id, {'command': 'MENU', 'menu': menu_name, 'step': 1})
if menu_name == 'bbs':
response = build_menu(bbs_menu_items, "📰BBS Menu📰")
elif menu_name == 'utilities':
response = build_menu(utilities_menu_items, "🛠Utilities Menu🛠")
else:
update_user_state(sender_id, {'command': 'MAIN_MENU', 'step': 1}) # Reset to main menu state
mail = get_mail(get_node_id_from_num(sender_id, interface))
response = build_menu(main_menu_items, f"💾TC² BBS💾 (✉️:{len(mail)})")
send_message(response, sender_id, interface)
def get_node_name(node_id, interface):
node_info = interface.nodes.get(node_id)
if node_info:
return node_info['user']['longName']
return f"Node {node_id}"
def handle_mail_command(sender_id, interface):
response = "Mail Menu✉\nWhat would you like to do with mail?\n[R]ead [S]end E[X]IT"
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 1})
def handle_bulletin_command(sender_id, interface):
response = f"📰Bulletin Menu📰\nWhich board would you like to enter?\n[G]eneral [I]nfo [N]ews [U]rgent"
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'BULLETIN_MENU', 'step': 1})
def handle_exit_command(sender_id, interface):
send_message("Type 'HELP' for a list of commands.", sender_id, interface)
update_user_state(sender_id, None)
def handle_stats_command(sender_id, interface):
response = "📊Stats Menu📊\nWhat stats would you like to view?\n[N]odes [H]ardware [R]oles E[X]IT"
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'STATS', 'step': 1})
def handle_fortune_command(sender_id, interface):
try:
with open('fortunes.txt', 'r') as file:
fortunes = file.readlines()
if not fortunes:
send_message("No fortunes available.", sender_id, interface)
return
fortune = random.choice(fortunes).strip()
decorated_fortune = f"🔮 {fortune} 🔮"
send_message(decorated_fortune, sender_id, interface)
except Exception as e:
send_message(f"Error generating fortune: {e}", sender_id, interface)
def handle_stats_steps(sender_id, message, step, interface):
message = message.lower().strip()
if len(message) == 2 and message[1] == 'x':
message = message[0]
if step == 1:
choice = message
if choice == 'x':
handle_help_command(sender_id, interface)
return
elif choice == 'n':
current_time = int(time.time())
timeframes = {
"All time": None,
"Last 24 hours": 86400,
"Last 8 hours": 28800,
"Last hour": 3600
}
total_nodes_summary = []
for period, seconds in timeframes.items():
if seconds is None:
total_nodes = len(interface.nodes)
else:
time_limit = current_time - seconds
total_nodes = sum(1 for node in interface.nodes.values() if node.get('lastHeard') is not None and node['lastHeard'] >= time_limit)
total_nodes_summary.append(f"- {period}: {total_nodes}")
response = "Total nodes seen:\n" + "\n".join(total_nodes_summary)
send_message(response, sender_id, interface)
handle_stats_command(sender_id, interface)
elif choice == 'h':
hw_models = {}
for node in interface.nodes.values():
hw_model = node['user'].get('hwModel', 'Unknown')
hw_models[hw_model] = hw_models.get(hw_model, 0) + 1
response = "Hardware Models:\n" + "\n".join([f"{model}: {count}" for model, count in hw_models.items()])
send_message(response, sender_id, interface)
handle_stats_command(sender_id, interface)
elif choice == 'r':
roles = {}
for node in interface.nodes.values():
role = node['user'].get('role', 'Unknown')
roles[role] = roles.get(role, 0) + 1
response = "Roles:\n" + "\n".join([f"{role}: {count}" for role, count in roles.items()])
send_message(response, sender_id, interface)
handle_stats_command(sender_id, interface)
def handle_bb_steps(sender_id, message, step, state, interface, bbs_nodes):
boards = {0: "General", 1: "Info", 2: "News", 3: "Urgent"}
if step == 1:
if message.lower() == 'e':
handle_help_command(sender_id, interface, 'bbs')
return
board_name = boards[int(message)]
bulletins = get_bulletins(board_name)
response = f"{board_name} has {len(bulletins)} messages.\n[R]ead [P]ost"
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'BULLETIN_ACTION', 'step': 2, 'board': board_name})
elif step == 2:
board_name = state['board']
if message.lower() == 'r':
bulletins = get_bulletins(board_name)
if bulletins:
send_message(f"Select a bulletin number to view from {board_name}:", sender_id, interface)
for bulletin in bulletins:
send_message(f"[{bulletin[0]}] {bulletin[1]}", sender_id, interface)
update_user_state(sender_id, {'command': 'BULLETIN_READ', 'step': 3, 'board': board_name})
else:
send_message(f"No bulletins in {board_name}.", sender_id, interface)
handle_bb_steps(sender_id, 'e', 1, state, interface, bbs_nodes)
elif message.lower() == 'p':
if board_name.lower() == 'urgent':
node_id = get_node_id_from_num(sender_id, interface)
allowed_nodes = interface.allowed_nodes
logging.info(f"Checking permissions for node_id: {node_id} with allowed_nodes: {allowed_nodes}") # Debug statement
if allowed_nodes and node_id not in allowed_nodes:
send_message("You don't have permission to post to this board.", sender_id, interface)
handle_bb_steps(sender_id, 'e', 1, state, interface, bbs_nodes)
return
send_message("What is the subject of your bulletin? Keep it short.", sender_id, interface)
update_user_state(sender_id, {'command': 'BULLETIN_POST', 'step': 4, 'board': board_name})
elif step == 3:
bulletin_id = int(message)
sender_short_name, date, subject, content, unique_id = get_bulletin_content(bulletin_id)
send_message(f"From: {sender_short_name}\nDate: {date}\nSubject: {subject}\n- - - - - - -\n{content}", sender_id, interface)
board_name = state['board']
handle_bb_steps(sender_id, 'e', 1, state, interface, bbs_nodes)
elif step == 4:
subject = message
send_message("Send the contents of your bulletin. Send a message with END when finished.", sender_id, interface)
update_user_state(sender_id, {'command': 'BULLETIN_POST_CONTENT', 'step': 5, 'board': state['board'], 'subject': subject, 'content': ''})
elif step == 5:
if message.lower() == "end":
board = state['board']
subject = state['subject']
content = state['content']
node_id = get_node_id_from_num(sender_id, interface)
node_info = interface.nodes.get(node_id)
if node_info is None:
send_message("Error: Unable to retrieve your node information.", sender_id, interface)
update_user_state(sender_id, None)
return
sender_short_name = node_info['user'].get('shortName', f"Node {sender_id}")
unique_id = add_bulletin(board, sender_short_name, subject, content, bbs_nodes, interface)
send_message(f"Your bulletin '{subject}' has been posted to {board}.\n(╯°□°)╯📄📌[{board}]", sender_id, interface)
handle_bb_steps(sender_id, 'e', 1, state, interface, bbs_nodes)
else:
state['content'] += message + "\n"
update_user_state(sender_id, state)
def handle_mail_steps(sender_id, message, step, state, interface, bbs_nodes):
message = message.strip()
if len(message) == 2 and message[1] == 'x':
message = message[0]
if step == 1:
choice = message.lower()
if choice == 'r':
sender_node_id = get_node_id_from_num(sender_id, interface)
mail = get_mail(sender_node_id)
if mail:
send_message(f"You have {len(mail)} mail messages. Select a message number to read:", sender_id, interface)
for msg in mail:
send_message(f"-{msg[0]}-\nDate: {msg[3]}\nFrom: {msg[1]}\nSubject: {msg[2]}", sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 2})
else:
send_message("There are no messages in your mailbox.📭", sender_id, interface)
update_user_state(sender_id, None)
elif choice == 's':
send_message("What is the Short Name of the node you want to leave a message for?", sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 3})
elif choice == 'x':
handle_help_command(sender_id, interface)
elif step == 2:
mail_id = int(message)
try:
sender_node_id = get_node_id_from_num(sender_id, interface)
sender, date, subject, content, unique_id = get_mail_content(mail_id, sender_node_id)
send_message(f"Date: {date}\nFrom: {sender}\nSubject: {subject}\n{content}", sender_id, interface)
send_message("What would you like to do with this message?\n[K]eep [D]elete [R]eply", sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 4, 'mail_id': mail_id, 'unique_id': unique_id, 'sender': sender, 'subject': subject, 'content': content})
except TypeError:
logging.info(f"Node {sender_id} tried to access non-existent message")
send_message("Mail not found", sender_id, interface)
update_user_state(sender_id, None)
elif step == 3:
short_name = message.lower()
nodes = get_node_info(interface, short_name)
if not nodes:
send_message("I'm unable to find that node in my database.", sender_id, interface)
handle_mail_command(sender_id, interface)
elif len(nodes) == 1:
recipient_id = nodes[0]['num']
recipient_name = get_node_name(recipient_id, interface)
send_message(f"What is the subject of your message to {recipient_name}?\nKeep it short.", sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 5, 'recipient_id': recipient_id})
else:
send_message("There are multiple nodes with that short name. Which one would you like to leave a message for?", sender_id, interface)
for i, node in enumerate(nodes):
send_message(f"[{i}] {node['longName']}", sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 6, 'nodes': nodes})
elif step == 4:
if message.lower() == "d":
unique_id = state['unique_id']
sender_node_id = get_node_id_from_num(sender_id, interface)
delete_mail(unique_id, sender_node_id, bbs_nodes, interface)
send_message("The message has been deleted 🗑️", sender_id, interface)
update_user_state(sender_id, None)
elif message.lower() == "r":
sender = state['sender']
send_message(f"Send your reply to {sender} now, followed by a message with END", sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 7, 'reply_to_mail_id': state['mail_id'], 'subject': f"Re: {state['subject']}", 'content': ''})
else:
send_message("The message has been kept in your inbox.✉️", sender_id, interface)
update_user_state(sender_id, None)
elif step == 5:
subject = message
send_message("Send your message. You can send it in multiple messages if it's too long for one.\nSend a single message with END when you're done", sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 7, 'recipient_id': state['recipient_id'], 'subject': subject, 'content': ''})
elif step == 6:
selected_node_index = int(message)
selected_node = state['nodes'][selected_node_index]
recipient_id = selected_node['num']
recipient_name = get_node_name(recipient_id, interface)
send_message(f"What is the subject of your message to {recipient_name}?\nKeep it short.", sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 5, 'recipient_id': recipient_id})
elif step == 7:
if message.lower() == "end":
if 'reply_to_mail_id' in state:
recipient_id = get_sender_id_by_mail_id(state['reply_to_mail_id']) # Get the sender ID from the mail ID
else:
recipient_id = state.get('recipient_id')
subject = state['subject']
content = state['content']
recipient_name = get_node_name(recipient_id, interface)
sender_short_name = get_node_short_name(get_node_id_from_num(sender_id, interface), interface)
unique_id = add_mail(get_node_id_from_num(sender_id, interface), sender_short_name, recipient_id, subject, content, bbs_nodes, interface)
send_message(f"Mail has been posted to the mailbox of {recipient_name}.\n(╯°□°)╯📨📬", sender_id, interface)
notification_message = f"You have a new mail message from {sender_short_name}. Check your mailbox by responding to this message with CM."
send_message(notification_message, recipient_id, interface)
update_user_state(sender_id, None)
update_user_state(sender_id, {'command': 'MAIL', 'step': 8})
else:
state['content'] += message + "\n"
update_user_state(sender_id, state)
elif step == 8:
if message.lower() == "y":
handle_mail_command(sender_id, interface)
else:
send_message("Okay, feel free to send another command.", sender_id, interface)
update_user_state(sender_id, None)
def handle_wall_of_shame_command(sender_id, interface):
response = "Devices with battery levels below 20%:\n"
for node_id, node in interface.nodes.items():
metrics = node.get('deviceMetrics', {})
battery_level = metrics.get('batteryLevel', 101)
if battery_level < 20:
long_name = node['user']['longName']
response += f"{long_name} - Battery {battery_level}%\n"
if response == "Devices with battery levels below 20%:\n":
response = "No devices with battery levels below 20% found."
send_message(response, sender_id, interface)
def handle_channel_directory_command(sender_id, interface):
response = "📚CHANNEL DIRECTORY📚\nWhat would you like to do?\n[V]iew [P]ost E[X]IT"
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'CHANNEL_DIRECTORY', 'step': 1})
def handle_channel_directory_steps(sender_id, message, step, state, interface):
message = message.strip()
if len(message) == 2 and message[1] == 'x':
message = message[0]
if step == 1:
choice = message
if choice.lower() == 'x':
handle_help_command(sender_id, interface)
return
elif choice.lower() == 'v':
channels = get_channels()
if channels:
response = "Select a channel number to view:\n" + "\n".join(
[f"[{i}] {channel[0]}" for i, channel in enumerate(channels)])
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'CHANNEL_DIRECTORY', 'step': 2})
else:
send_message("No channels available in the directory.", sender_id, interface)
handle_channel_directory_command(sender_id, interface)
elif choice.lower == 'p':
send_message("Name your channel for the directory:", sender_id, interface)
update_user_state(sender_id, {'command': 'CHANNEL_DIRECTORY', 'step': 3})
elif step == 2:
channel_index = int(message)
channels = get_channels()
if 0 <= channel_index < len(channels):
channel_name, channel_url = channels[channel_index]
send_message(f"Channel Name: {channel_name}\nChannel URL:\n{channel_url}", sender_id, interface)
handle_channel_directory_command(sender_id, interface)
elif step == 3:
channel_name = message
send_message("Send a message with your channel URL or PSK:", sender_id, interface)
update_user_state(sender_id, {'command': 'CHANNEL_DIRECTORY', 'step': 4, 'channel_name': channel_name})
elif step == 4:
channel_url = message
channel_name = state['channel_name']
add_channel(channel_name, channel_url)
send_message(f"Your channel '{channel_name}' has been added to the directory.", sender_id, interface)
handle_channel_directory_command(sender_id, interface)
def handle_send_mail_command(sender_id, message, interface, bbs_nodes):
try:
parts = message.split(",,", 3)
if len(parts) != 4:
send_message("Send Mail Quick Command format:\nSM,,{short_name},,{subject},,{message}", sender_id, interface)
return
_, short_name, subject, content = parts
nodes = get_node_info(interface, short_name.lower())
if not nodes:
send_message(f"Node with short name '{short_name}' not found.", sender_id, interface)
return
if len(nodes) > 1:
send_message(f"Multiple nodes with short name '{short_name}' found. Please be more specific.", sender_id,
interface)
return
recipient_id = nodes[0]['num']
recipient_name = get_node_name(recipient_id, interface)
sender_short_name = get_node_short_name(get_node_id_from_num(sender_id, interface), interface)
unique_id = add_mail(get_node_id_from_num(sender_id, interface), sender_short_name, recipient_id, subject,
content, bbs_nodes, interface)
send_message(f"Mail has been sent to {recipient_name}.", sender_id, interface)
notification_message = f"You have a new mail message from {sender_short_name}. Check your mailbox by responding to this message with CM."
send_message(notification_message, recipient_id, interface)
except Exception as e:
logging.error(f"Error processing send mail command: {e}")
send_message("Error processing send mail command.", sender_id, interface)
def handle_check_mail_command(sender_id, interface):
try:
sender_node_id = get_node_id_from_num(sender_id, interface)
mail = get_mail(sender_node_id)
if not mail:
send_message("You have no new messages.", sender_id, interface)
return
response = "📬 You have the following messages:\n"
for i, msg in enumerate(mail):
response += f"{i + 1:02d}. From: {msg[1]}, Subject: {msg[2]}\n"
response += "\nPlease reply with the number of the message you want to read."
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'CHECK_MAIL', 'step': 1, 'mail': mail})
except Exception as e:
logging.error(f"Error processing check mail command: {e}")
send_message("Error processing check mail command.", sender_id, interface)
def handle_read_mail_command(sender_id, message, state, interface):
try:
mail = state.get('mail', [])
message_number = int(message) - 1
if message_number < 0 or message_number >= len(mail):
send_message("Invalid message number. Please try again.", sender_id, interface)
return
mail_id = mail[message_number][0]
sender_node_id = get_node_id_from_num(sender_id, interface)
sender, date, subject, content, unique_id = get_mail_content(mail_id, sender_node_id)
response = f"Date: {date}\nFrom: {sender}\nSubject: {subject}\n\n{content}"
send_message(response, sender_id, interface)
send_message("What would you like to do with this message?\n[K]eep [D]elete [R]eply", sender_id, interface)
update_user_state(sender_id, {'command': 'CHECK_MAIL', 'step': 2, 'mail_id': mail_id, 'unique_id': unique_id, 'sender': sender, 'subject': subject, 'content': content})
except ValueError:
send_message("Invalid input. Please enter a valid message number.", sender_id, interface)
except Exception as e:
logging.error(f"Error processing read mail command: {e}")
send_message("Error processing read mail command.", sender_id, interface)
def handle_delete_mail_confirmation(sender_id, message, state, interface, bbs_nodes):
try:
choice = message.lower().strip()
if len(choice) == 2 and choice[1] == 'x':
choice = choice[0]
if choice == 'd':
unique_id = state['unique_id']
sender_node_id = get_node_id_from_num(sender_id, interface)
delete_mail(unique_id, sender_node_id, bbs_nodes, interface)
send_message("The message has been deleted 🗑️", sender_id, interface)
update_user_state(sender_id, None)
elif choice == 'r':
sender = state['sender']
send_message(f"Send your reply to {sender} now, followed by a message with END", sender_id, interface)
update_user_state(sender_id, {'command': 'MAIL', 'step': 7, 'reply_to_mail_id': state['mail_id'], 'subject': f"Re: {state['subject']}", 'content': ''})
else:
send_message("The message has been kept in your inbox.✉️", sender_id, interface)
update_user_state(sender_id, None)
except Exception as e:
logging.error(f"Error processing delete mail confirmation: {e}")
send_message("Error processing delete mail confirmation.", sender_id, interface)
def handle_post_bulletin_command(sender_id, message, interface, bbs_nodes):
try:
parts = message.split(",,", 3)
if len(parts) != 4:
send_message("Post Bulletin Quick Command format:\nPB,,{board_name},,{subject},,{content}", sender_id, interface)
return
_, board_name, subject, content = parts
sender_short_name = get_node_short_name(get_node_id_from_num(sender_id, interface), interface)
unique_id = add_bulletin(board_name, sender_short_name, subject, content, bbs_nodes, interface)
send_message(f"Your bulletin '{subject}' has been posted to {board_name}.", sender_id, interface)
if board_name.lower() == "urgent":
notification_message = f"💥NEW URGENT BULLETIN💥\nFrom: {sender_short_name}\nTitle: {subject}"
send_message(notification_message, BROADCAST_NUM, interface)
except Exception as e:
logging.error(f"Error processing post bulletin command: {e}")
send_message("Error processing post bulletin command.", sender_id, interface)
def handle_check_bulletin_command(sender_id, message, interface):
try:
# Split the message only once
parts = message.split(",,", 1)
if len(parts) != 2 or not parts[1].strip():
send_message("Check Bulletins Quick Command format:\nCB,,board_name", sender_id, interface)
return
boards = {0: "General", 1: "Info", 2: "News", 3: "Urgent"} #list of boards
board_name = parts[1].strip().capitalize() #get board name from quick command and capitalize it
board_name = boards[next(key for key, value in boards.items() if value == board_name)] #search for board name in list
bulletins = get_bulletins(board_name)
if not bulletins:
send_message(f"No bulletins available on {board_name} board.", sender_id, interface)
return
response = f"📰 Bulletins on {board_name} board:\n"
for i, bulletin in enumerate(bulletins):
response += f"[{i+1:02d}] Subject: {bulletin[1]}, From: {bulletin[2]}, Date: {bulletin[3]}\n"
response += "\nPlease reply with the number of the bulletin you want to read."
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'CHECK_BULLETIN', 'step': 1, 'board_name': board_name, 'bulletins': bulletins})
except Exception as e:
logging.error(f"Error processing check bulletin command: {e}")
send_message("Error processing check bulletin command.", sender_id, interface)
def handle_read_bulletin_command(sender_id, message, state, interface):
try:
bulletins = state.get('bulletins', [])
message_number = int(message) - 1
if message_number < 0 or message_number >= len(bulletins):
send_message("Invalid bulletin number. Please try again.", sender_id, interface)
return
bulletin_id = bulletins[message_number][0]
sender, date, subject, content, unique_id = get_bulletin_content(bulletin_id)
response = f"Date: {date}\nFrom: {sender}\nSubject: {subject}\n\n{content}"
send_message(response, sender_id, interface)
update_user_state(sender_id, None)
except ValueError:
send_message("Invalid input. Please enter a valid bulletin number.", sender_id, interface)
except Exception as e:
logging.error(f"Error processing read bulletin command: {e}")
send_message("Error processing read bulletin command.", sender_id, interface)
def handle_post_channel_command(sender_id, message, interface):
try:
parts = message.split("|", 3)
if len(parts) != 3:
send_message("Post Channel Quick Command format:\nCHP,,{channel_name},,{channel_url}", sender_id, interface)
return
_, channel_name, channel_url = parts
bbs_nodes = interface.bbs_nodes
add_channel(channel_name, channel_url, bbs_nodes, interface)
send_message(f"Channel '{channel_name}' has been added to the directory.", sender_id, interface)
except Exception as e:
logging.error(f"Error processing post channel command: {e}")
send_message("Error processing post channel command.", sender_id, interface)
def handle_check_channel_command(sender_id, interface):
try:
channels = get_channels()
if not channels:
send_message("No channels available in the directory.", sender_id, interface)
return
response = "Available Channels:\n"
for i, channel in enumerate(channels):
response += f"{i + 1:02d}. Name: {channel[0]}\n"
response += "\nPlease reply with the number of the channel you want to view."
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'CHECK_CHANNEL', 'step': 1, 'channels': channels})
except Exception as e:
logging.error(f"Error processing check channel command: {e}")
send_message("Error processing check channel command.", sender_id, interface)
def handle_read_channel_command(sender_id, message, state, interface):
try:
channels = state.get('channels', [])
message_number = int(message) - 1
if message_number < 0 or message_number >= len(channels):
send_message("Invalid channel number. Please try again.", sender_id, interface)
return
channel_name, channel_url = channels[message_number]
response = f"Channel Name: {channel_name}\nChannel URL: {channel_url}"
send_message(response, sender_id, interface)
update_user_state(sender_id, None)
except ValueError:
send_message("Invalid input. Please enter a valid channel number.", sender_id, interface)
except Exception as e:
logging.error(f"Error processing read channel command: {e}")
send_message("Error processing read channel command.", sender_id, interface)
def handle_list_channels_command(sender_id, interface):
try:
channels = get_channels()
if not channels:
send_message("No channels available in the directory.", sender_id, interface)
return
response = "Available Channels:\n"
for i, channel in enumerate(channels):
response += f"{i+1:02d}. Name: {channel[0]}\n"
response += "\nPlease reply with the number of the channel you want to view."
send_message(response, sender_id, interface)
update_user_state(sender_id, {'command': 'LIST_CHANNELS', 'step': 1, 'channels': channels})
except Exception as e:
logging.error(f"Error processing list channels command: {e}")
send_message("Error processing list channels command.", sender_id, interface)
def handle_quick_help_command(sender_id, interface):
response = ("QUICK COMMANDS✈\nSend command below for usage info:\nSM,, - Send "
"Mail\nCM - Check Mail\nPB,, - Post Bulletin\nCB,, - Check Bulletins\n")
send_message(response, sender_id, interface)
def get_games_available(game_files):
"""Returns a dictionary of available games with their filenames and titles.
- If the first line contains `title="Game Title"`, it uses that as the display name.
- Otherwise, it uses the filename (without extension).
"""
games = {}
for file in game_files:
try:
file_path = os.path.join('./games', file)
with open(file_path, 'r', encoding='utf-8') as fp:
first_line = fp.readline().strip()
# Check if the first line has a title definition
if first_line.lower().startswith("title="):
game_title = first_line.split("=", 1)[1].strip().strip('"')
else:
game_title = file # Use the filename as the title
games[game_title] = file # Store the title with its correct filename
except Exception as e:
print(f"Error loading game {file}: {e}")
return games # Return a dictionary {Title: Filename}
def handle_games_command(sender_id, interface):
"""Handles the Games Menu and lists available text-based games."""
game_files = [
f for f in os.listdir('./games')
if os.path.isfile(os.path.join('./games', f)) and (f.endswith('.txt') or f.endswith('.csv') or '.' not in f)
]
games_available = get_games_available(game_files)
if not games_available:
send_message("No games available yet. Come back soon.", sender_id, interface)
update_user_state(sender_id, {'command': 'UTILITIES', 'step': 1})
return None
game_titles = list(games_available.keys()) # Display titles
game_filenames = list(games_available.values()) # Actual filenames
numbered_games = "\n".join(f"{i+1}. {title}" for i, title in enumerate(game_titles))
numbered_games += "\n[X] Exit"
response = f"🎮 Games Menu 🎮\nWhich game would you like to play?\n{numbered_games}"
send_message(response, sender_id, interface)
# ✅ Ensure `games` state is always reset when displaying the menu
update_user_state(sender_id, {'command': 'GAMES', 'step': 1, 'games': game_filenames, 'titles': game_titles})
return response
def handle_game_menu_selection(sender_id, message, step, interface, state):
"""Handles the user's selection of a game from the Games Menu, allowing exit with 'X' and starting immediately."""
# Allow users to exit with "X" like other menus
if message.lower() == "x":
handle_help_command(sender_id, interface) # Return to main menu
return
games_available = state.get('games', [])
try:
game_index = int(message) - 1 # Convert user input to zero-based index
if 0 <= game_index < len(games_available):
selected_game = games_available[game_index]
# Reset user state to ensure a clean start
update_user_state(sender_id, None)
# Update state to indicate the user is now in-game
update_user_state(sender_id, {'command': 'IN_GAME', 'step': 3, 'game': selected_game})
# Start the game immediately
start_selected_game(sender_id, interface, {'game': selected_game})
else:
send_message("Invalid selection. Please enter a valid game number or 'X' to exit.", sender_id, interface)
except ValueError:
send_message("Invalid input. Please enter a number corresponding to a game or 'X' to exit.", sender_id, interface)
def start_selected_game(sender_id, interface, state):
"""Starts the game selected by the user and ensures title detection."""
game_name = state.get('game', None)
if not game_name:
send_message("Unexpected error: No game found. Returning to game menu.", sender_id, interface)
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
return
# Construct the game file path
game_file_path = os.path.join('./games', game_name)
# Final check if the file exists
if not os.path.exists(game_file_path):
send_message(f"Error: The game '{game_name}' could not be loaded.", sender_id, interface)
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
return
# Load the game map with title handling
try:
game_title, game_map = load_game_map(game_file_path)
except Exception as e:
send_message(f"Error loading game: {e}", sender_id, interface)
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
return
if not game_map:
send_message(f"Error: The game '{game_name}' could not be loaded.", sender_id, interface)
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
return
# Set up the user state for playing (ENSURE game_title is included)
new_state = {
'command': 'IN_GAME',
'step': 3,
'game': game_name,
'game_title': game_title, # ✅ Ensure title is stored
'game_map': game_map,
'game_position': 1
}
update_user_state(sender_id, new_state)
# Present the first segment
present_story_segment(sender_id, interface, new_state) # ✅ Pass updated state
def load_game_map(file_path):
"""Loads a game map from a CSV file and returns its structured format."""
print(f"DEBUG: Inside load_game_map(), trying to open {file_path}")
try:
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
print(f"DEBUG: Read {len(lines)} lines from file.")
if not lines:
print("❌ ERROR: File is empty!")
return None
# Check if the first line contains a title
first_line = lines[0].strip()
if first_line.lower().startswith("title="):
game_title = first_line.split("=", 1)[1].strip().strip('"')
game_lines = lines[1:] # Skip title
else:
game_title = os.path.splitext(os.path.basename(file_path))[0] # Use filename without path/extension
game_lines = lines
print(f"DEBUG: Game title detected -> {game_title}")
# Parse game map
game_map = {}
for i, line in enumerate(game_lines, start=1):
game_map[i] = line.strip().split(",")
print(f"DEBUG: Successfully loaded game map with {len(game_map)} entries.")
return game_title, game_map
except Exception as e:
print(f"❌ ERROR inside load_game_map(): {e}")
return None
def present_story_segment(sender_id, interface, state):
"""Presents the current segment of the game and available choices."""
game_name = state.get('game')
game_title = state.get('game_title', "Unknown Game")
game_map = state.get('game_map', {})
game_position = state.get('game_position', 1)
if game_position not in game_map:
send_message("Error: Invalid game state.", sender_id, interface)
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
handle_games_command(sender_id, interface)
return
# Retrieve the current story segment
segment = game_map[game_position]
storyline = segment[0]
choices = segment[1:] # Extract choices
# 🛠️ **Check if this is a game-over state (no choices)**
if not choices:
send_message(f"🎮 {game_title} 🎮\n\n{storyline}\n\n💀 GAME OVER! Returning to the game menu...", sender_id, interface)
# Reset user state before returning to menu
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
import time
time.sleep(1) # Ensure the message is processed before switching menus
handle_games_command(sender_id, interface)
return
# Build response message
response = f"🎮 {game_title} 🎮\n\n{storyline}\n\n"
for i in range(0, len(choices), 2): # Display numbered choices
response += f"{(i//2)+1}. {choices[i]}\n"
response += "\n[X] Exit"
send_message(response, sender_id, interface)
# Ensure user state is properly tracked
update_user_state(sender_id, {
'command': 'IN_GAME',
'step': 3,
'game': game_name,
'game_title': game_title,
'game_map': game_map,
'game_position': game_position
})
def process_game_choice(sender_id, message, interface, state):
"""Processes the player's choice and advances the game."""
game_map = state.get('game_map', {})
game_position = state.get('game_position', 1)
if game_position not in game_map:
send_message("Error: Invalid game state.", sender_id, interface)
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
handle_games_command(sender_id, interface)
return
segment = game_map[game_position]
# Extract the storyline and choices
storyline = segment[0]
choices = segment[1:]
# Ensure choices are properly formatted
if len(choices) % 2 != 0:
send_message("Error: Game data is corrupted.", sender_id, interface)
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
handle_games_command(sender_id, interface)
return
# Handle Exit
if message.lower() == "x":
send_message(f"Exiting '{state['game_title']}'... Returning to Games Menu.", sender_id, interface)
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
handle_games_command(sender_id, interface)
return
try:
choice_index = int(message) - 1
if choice_index < 0 or choice_index * 2 + 1 >= len(choices):
send_message("Invalid selection. Please enter a valid number.", sender_id, interface)
return
target_position = int(choices[choice_index * 2 + 1])
if target_position not in game_map:
send_message("💀 GAME OVER! No further choices available. 💀 Returning to the game menu...", sender_id, interface)
update_user_state(sender_id, {'command': 'GAMES', 'step': 1})
handle_games_command(sender_id, interface)
return
# ✅ FIX: Pass `state` instead of `update_user_state`
state['game_position'] = target_position
update_user_state(sender_id, state)
# ✅ FIX: Pass the correct `state` variable, NOT `update_user_state`
present_story_segment(sender_id, interface, state)
except ValueError:
send_message("Invalid input. Please enter a valid number.", sender_id, interface)