pythonbot/console.py

158 lines
7.1 KiB
Python

import vk_api
import datetime
import time
import logging
import random
import json
import threading
import config
import dan63047VKbot
from pymysql.cursors import DictCursor
from collections import deque
from vk_api.bot_longpoll import VkBotLongPoll, VkBotEventType
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
try:
log_path = f'logs/console_log{str(datetime.datetime.now())}.log'
handler = logging.FileHandler(log_path, 'w', 'utf-8')
except:
log_path = 'console.log'
handler = logging.FileHandler(log_path, 'w', 'utf-8')
handler.setFormatter(logging.Formatter('%(message)s'))
root_logger.addHandler(handler)
def log(warning, text):
if warning:
msg = "[" + str(datetime.datetime.now()) + "] [WARNING] " + text
logging.warning(msg)
print(msg)
else:
msg = "[" + str(datetime.datetime.now()) + "] " + text
logging.info(msg)
print(msg)
try:
vk = vk_api.VkApi(token=config.vk_group_token)
longpoll = VkBotLongPoll(vk, config.group_id)
log(False, "VK API Group token: OK")
except Exception as e:
log(True, "Can't connect to longpull: "+str(e))
exit(log(False, "[SHUTDOWN]"))
def cycle():
active = True
dan63047VKbot.load_users()
print("Welcome to dan63047bot console\nEnter command:")
while active:
command = input(">")
logging.info("[" + str(datetime.datetime.now()) + "] Entered command: " + command)
command = command.split(' ', 1)
if command[0] == "exit":
print("Bye")
active = False
elif command[0] == "help":
print("Console in development")
print("help - command list")
print("exit - shutdown console")
print("message [peer_id] [message] - send message in vk from bot to peer_id")
print("msg_history [peer_id] [count] - return message history in 'count' last messages with peer_id")
elif command[0] == "message":
try:
command[1] = command[1].split(' ', 1)
except:
log(True, "No arguments")
continue
random_id = random.randint(-9223372036854775808, 9223372036854775807)
try:
m = vk.method('messages.send', {'peer_id': command[1][0], 'message': command[1][1], 'random_id': random_id})
log(False, f'message_id: {m}, peer_id: {command[1][0]}, random_id: {random_id}')
except Exception as e:
log(True, f'Failed to send message: {str(e)}')
elif command[0] == "msg_history":
try:
command[1] = command[1].split(' ', 1)
except:
log(True, "No arguments")
continue
try:
if len(command[1]) == 2:
m = vk.method('messages.getHistory', {'count': command[1][1], 'peer_id': command[1][0]})
else:
m = vk.method('messages.getHistory', {'peer_id': command[1][0]})
log(False, f"That dialogue have {m['count']} messages ({len(m['items'])} in response)")
for i in m['items']:
if str(i['from_id']) == "-"+str(config.group_id):
f = "You"
else:
user_info = vk.method('users.get', {'user_ids': i["from_id"], 'fields': 'verified,last_seen,sex'})
f = f'{user_info[0]["first_name"]} {user_info[0]["last_name"]}'
datetime_time = datetime.datetime.fromtimestamp(i['date'])
date = datetime_time.strftime('%d.%m.%Y %H:%M:%S')
msg = f"{f} {date}: {i['text']}"
if i['attachments']:
for m in i['attachments']:
if m['type'] == "sticker":
msg += f" [sticker_id{m[m['type']]['sticker_id']}]"
elif m['type'] == "wall":
msg += " [" + m['type'] + str(m[m['type']]['from_id']) + \
"_" + str(i[m['type']]['id']) + "]"
elif m['type'] == "link":
msg += " [" + m['type'] + " " + m[m['type']]['title'] + "]"
else:
msg += " [" + m['type'] + str(m[m['type']]['owner_id']) + \
"_" + str(m[m['type']]['id']) + "]"
print(msg)
except Exception as e:
log(True, f'Failed to get history: {str(e)}')
elif command[0] == "bot":
if len(command) == 2:
command[1] = command[1].split(' ', 1)
if len(command[1]) == 2:
if command[1][0] == "midnight":
try:
userinfo = dan63047VKbot.db.get_from_users(command[1][1])
log(False, "Midnight flag: "+str(userinfo["midnight"]))
dan63047VKbot.bot[int(command[1][1])].event("midnight")
except KeyError as e:
log(True, "Bot object is not exist: "+str(e))
elif command[1][0] == "changeMidnightFlag":
try:
userinfo = dan63047VKbot.db.get_from_users(command[1][1])
if userinfo["midnight"]:
dan63047VKbot.bot[int(command[1][1])].change_flag('midnight', False)
else:
dan63047VKbot.bot[int(command[1][1])].change_flag('midnight', True)
userinfo = dan63047VKbot.db.get_from_users(command[1][1])
log(False, "Midnight flag: "+str(userinfo["midnight"]))
except KeyError as e:
log(True, "Bot object is not exist: "+str(e))
elif command[1][0] == "create":
try:
dan63047VKbot.create_new_bot_object(command[1][1])
log(False, "Bot-object has been created")
except Exception as e:
log(True, "Can't create bot-object - "+str(e))
elif command[1][0] == "delete":
try:
del dan63047VKbot.bot[int(command[1][1])]
dan63047VKbot.db.delete_user(command[1][1])
log(False, "Bot-object has been deleted")
except Exception as e:
log(True, "Can't delete bot-object - "+str(e))
else:
if command[1][0] == "update":
dan63047VKbot.load_users()
else:
if dan63047VKbot.bot:
log(False, f"There is {len(dan63047VKbot.bot)} bot-objects")
for i in dan63047VKbot.bot:
print(dan63047VKbot.bot[i])
else:
print("No bot-objects")
else:
log(True, "Unknown command. Type 'help' for command list")
cycle()