import vk_api import datetime import time import requests import logging import pyowm import random import json import threading import wikipediaapi as wiki from collections import deque from config import vk, owm, vk_mda, group_id, album_for_command, owner_id from vk_api.bot_longpoll import VkBotLongPoll, VkBotEventType bot = {} users = {} debug_array = {'vk_warnings': 0, 'logger_warnings': 0, 'start_time': 0, 'messages_get': 0, 'messages_answered': 0} root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) handler = logging.FileHandler('bot.log', 'w', 'utf-8') handler.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')) root_logger.addHandler(handler) longpoll = VkBotLongPoll(vk, group_id) def update_users_json(massive): with open("users.json", 'w') as write_file: data = massive json.dump(data, write_file) write_file.close() def load_users(): try: with open("users.json", 'r') as users_file: users_not_json = json.load(users_file) for i in users_not_json: users[i] = users_not_json[i] users_file.close() for i in users: bot[int(i)] = VkBot(i, users[i]["midnight"]) except Exception as lol: log(True, f"Проблема с загрузкой users.json: {str(lol)}") def log(warning, text): if warning: logging.warning(text) debug_array['logger_warnings'] += 1 print("[" + time.strftime("%d.%m.%Y %H:%M:%S", time.gmtime()) + "][WARNING] " + text) else: logging.info(text) print("[" + time.strftime("%d.%m.%Y %H:%M:%S", time.gmtime()) + "] " + text) def toFixed(numObj, digits=0): return f"{numObj:.{digits}f}" class MyVkLongPoll(VkBotLongPoll): def listen(self): while True: try: for event in self.check(): yield event except Exception as e: err = "Беды с ВК: " + str(e) log(True, err) debug_array['vk_warnings'] += 1 time.sleep(15) continue def get_weather(place): try: weather_request = owm.weather_at_place(place) except pyowm.exceptions.api_response_error.NotFoundError as i: err = "Ошибка OpenWeather API: " + str(i) log(True, err) return "Такого города нет, либо данных о погоде нет" weather_answer = weather_request.get_weather() info = "Результат поиска погоды через OpenWeather API: " + str(weather_answer) log(False, info) return "В городе " + place + " сейчас " + weather_answer.get_detailed_status() + ", " + str( round(weather_answer.get_temperature('celsius')['temp'])) + "°C" class VkBot: def __init__(self, peer_id, midnight=False): log(False, f"Создан объект бота! id{peer_id}") self._CHAT_ID = peer_id self._ECHO_MODE = False self._ACCESS_LEVEL = 1 if midnight: self._MIDNIGHT_EVENT = True else: self._MIDNIGHT_EVENT = False if int(self._CHAT_ID) == int(owner_id): self._OWNER = True else: self._OWNER = False self._COMMANDS = ["!image", "!my_id", "!h", "!user_id", "!group_id", "!help", "!weather", "!wiki", "!byn", "!echo", "!game", "!debug", "!midnight", "!access"] def event(self, event): if event == "midnight" and self._MIDNIGHT_EVENT: current_time = datetime.datetime.fromtimestamp(time.time() + 10800) image = None midnight_text = ["Мидннайт!", "Полночь!", "Midnight!", "миднигхт", "Середина ночи", "Смена даты!"] midnight_after = ["Ложись спать!", "P E A C E A N D T R A N Q U I L I T Y", "Поиграй в майнкрафт", "Втыкай в ВК дальше", "hat in time is gay", "R34 по ахиту это смертный грех", "Egg", "вещ или бан", "Мой ник в игре _ичё"] midnight_output = random.choice(midnight_text) + "
" + f"Наступило {current_time.strftime('%d.%m.%Y')}

" random_thing = random.randint(0, 2) if random_thing == 0: midnight_output += random.choice(midnight_after) elif random_thing == 1: midnight_output += "Картинка дня:" image = self.random_image() elif random_thing == 2: midnight_output += "Цвет дня в формате HEX: #%02x%02x%02x" % (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) self.send(midnight_output, image) log(False, f"Бот id{self._CHAT_ID} оповестил о миднайте") def get_message(self, message, user_id): if self._ECHO_MODE: if message == "!echo off": self.send(message) self._ECHO_MODE = False log(False, f"Бот id{self._CHAT_ID} вышел из режима эхо") debug_array['messages_answered'] += 1 else: self.send(message) log(False, f"Эхо-бот id{self._CHAT_ID}: {message}") debug_array['messages_answered'] += 1 else: respond = {'attachment': None, 'text': None} message = message.split(' ', 1) if message[0] == self._COMMANDS[0]: respond['attachment'] = self.random_image() elif message[0] == self._COMMANDS[1]: respond['text'] = "Ваш ид: " + str(self._CHAT_ID) elif message[0] == self._COMMANDS[2] or message[0] == self._COMMANDS[5]: respond[ 'text'] = "Я бот, призванный доставлять неудобства.
Команды:
!my_id - сообщит ваш id в ВК
!user_id *id* - сообщит информацию о этом пользователе
!group_id *id* - сообщит информацию о этой группе
!image - отправляет рандомную картинку из альбома
!weather *город* - отправляет текущую погоду в городе (данные из OpenWeather API)
!wiki *запрос* - отправляет информацию об этом из Wikipedia
!byn - отправляет текущий курс валют, полученный из API НБ РБ
!echo - бот отправляет вам всё, что вы ему пишите
!game *камень/ножницы/бумага/статистика* - бот будет играть с вами в \"Камень, ножницы, бумага\" и записывать статистику
!midnight - бот будет уведомлять вас о 00:00 по Москве. Отправьте ещё раз, чтобы бот больше вас не уведомлял
!h, !help - справка
Дата последнего обновления: 10.05.2020 (обновление команды !midnight)
Проект бота на GitHub: https://github.com/dan63047/dan63047pythonbot" elif message[0] == self._COMMANDS[3]: try: respond['text'] = self.get_info_user(message[1]) except IndexError: respond['text'] = "Отсуствует аргумент" elif message[0] == self._COMMANDS[4]: try: respond['text'] = self.get_info_group(message[1]) except IndexError: respond['text'] = "Отсуствует аргумент" elif message[0] == self._COMMANDS[6]: try: respond['text'] = get_weather(message[1]) except IndexError: respond['text'] = "Отсуствует аргумент" elif message[0] == self._COMMANDS[7]: try: respond['text'] = self.wiki_article(message[1]) except IndexError: respond['text'] = "Отсуствует аргумент" elif message[0] == self._COMMANDS[8]: respond['text'] = self.exchange_rates() elif message[0] == self._COMMANDS[9]: vk.method('messages.send', {'peer_id': self._CHAT_ID, 'message': "Теперь бот работает в режиме эхо. Чтобы" " это выключить, введить \"!echo off\"", 'random_id': time.time()}) self._ECHO_MODE = True log(False, f"Бот id{self._CHAT_ID} в режиме эхо") elif message[0] == self._COMMANDS[10]: try: message[1] = message[1].lower() respond['text'] = self.game(message[1]) except IndexError: respond['text'] = "Отсуствует аргумент" elif message[0] == self._COMMANDS[11]: if self._ACCESS_LEVEL or int(user_id) == int(owner_id): try: respond['text'] = self.debug(message[1]) except IndexError: respond['text'] = self.debug() else: respond["text"] = "Отказано в доступе" elif message[0] == self._COMMANDS[12]: if self._ACCESS_LEVEL or int(user_id) == int(owner_id): if self._MIDNIGHT_EVENT: self._MIDNIGHT_EVENT = False self.send("Уведомление о миднайте выключено") log(False, f"Бот id{self._CHAT_ID}: Юзер отписался от ивента \"Миднайт\"") else: self._MIDNIGHT_EVENT = True self.send("Бот будет уведомлять вас о каждом миднайте") log(False, f"Бот id{self._CHAT_ID}: Юзер подписался на ивент \"Миднайт\"") users[self._CHAT_ID]["midnight"] = self._MIDNIGHT_EVENT update_users_json(users) else: respond['text'] = "Отказано в доступе" elif message[0] == self._COMMANDS[13]: if int(user_id) == int(owner_id): try: if message[1] == "owner": respond['text'] = "Теперь некоторыми командами может пользоваться только владелец бота" self._ACCESS_LEVEL = 0 elif message[1] == "all": respond['text'] = "Теперь все могут пользоваться всеми командами" self._ACCESS_LEVEL = 1 else: respond['text'] = "Некорректный аргумент" except IndexError: respond['text'] = "Отсуствует аргумент" else: respond['text'] = "Отказано в доступе" if respond['text'] or respond['attachment']: self.send(respond['text'], respond['attachment']) debug_array['messages_answered'] += 1 def debug(self, arg=None): if arg == "log": if self._OWNER: with open("bot.log", 'r') as f: log = list(deque(f, 10)) text_log = "
Последние 10 строк из лога:
" for i in range(len(log)): text_log += log[i] f.close() return text_log else: return "Отказано в доступе" else: up_time = time.time() - debug_array['start_time'] time_d = int(up_time) / (3600 * 24) time_h = int(up_time) / 3600 - int(time_d) * 24 time_min = int(up_time) / 60 - int(time_h) * 60 - int(time_d) * 24 * 60 time_sec = int(up_time) - int(time_min) * 60 - int(time_h) * 3600 - int(time_d) * 24 * 60 * 60 str_up_time = '%01d:%02d:%02d:%02d' % (time_d, time_h, time_min, time_sec) datetime_time = datetime.datetime.fromtimestamp(debug_array['start_time']) answer = "UPTIME: " + str_up_time + "
Прослушано сообщений: " + str( debug_array['messages_get']) + " (Отвечено на " + str( debug_array['messages_answered']) + ")
Ошибок в работе: " + str( debug_array['logger_warnings']) + " (Из них беды с ВК: " + str(debug_array['vk_warnings']) + ")
Обьектов бота: " + str(len(bot)) + "
Запуск бота по часам сервера: " + datetime_time.strftime('%d.%m.%Y %H:%M:%S UTC') return answer def game(self, thing): if thing == "статистика": with open("data_file.json", "r") as read_file: data = json.load(read_file) if str(self._CHAT_ID) in data: winrate = (data[str(self._CHAT_ID)]['wins'] / data[str(self._CHAT_ID)]['games']) * 100 return f"Камень, ножницы, бумага
Сыграно игр: {data[str(self._CHAT_ID)]['games']}
Из них:
•Побед: {data[str(self._CHAT_ID)]['wins']}
•Поражений: {data[str(self._CHAT_ID)]['defeats']}
•Ничей: {data[str(self._CHAT_ID)]['draws']}
Процент побед: {toFixed(winrate, 2)}%" else: return "Похоже, вы ещё никогда не играли в Камень, ножницы, бумага" elif thing == "камень" or thing == "ножницы" or thing == "бумага": things = ["камень", "ножницы", "бумага"] bot_thing = random.choice(things) if thing == "камень" and bot_thing == "ножницы": result = 2 elif thing == "ножницы" and bot_thing == "бумага": result = 2 elif thing == "бумага" and bot_thing == "камень": result = 2 elif thing == "ножницы" and bot_thing == "камень": result = 1 elif thing == "бумага" and bot_thing == "ножницы": result = 1 elif thing == "камень" and bot_thing == "бумага": result = 2 elif thing == "камень" and bot_thing == "камень": result = 0 elif thing == "ножницы" and bot_thing == "ножницы": result = 0 elif thing == "бумага" and bot_thing == "бумага": result = 0 if result == 2: response = f"Камень, ножницы, бумага
{thing} vs. {bot_thing}
Вы выиграли!" elif result == 1: response = f"Камень, ножницы, бумага
{thing} vs. {bot_thing}
Вы проиграли!" elif result == 0: response = f"Камень, ножницы, бумага
{thing} vs. {bot_thing}
Ничья!" with open("data_file.json", 'r') as write_file: try: data = json.load(write_file) except Exception: data = {} if str(self._CHAT_ID) not in data: data[str(self._CHAT_ID)] = {} data[str(self._CHAT_ID)]["games"] = 0 data[str(self._CHAT_ID)]["wins"] = 0 data[str(self._CHAT_ID)]["defeats"] = 0 data[str(self._CHAT_ID)]["draws"] = 0 if result == 2: data[str(self._CHAT_ID)]["games"] += 1 data[str(self._CHAT_ID)]["wins"] += 1 elif result == 1: data[str(self._CHAT_ID)]["games"] += 1 data[str(self._CHAT_ID)]["defeats"] += 1 elif result == 0: data[str(self._CHAT_ID)]["games"] += 1 data[str(self._CHAT_ID)]["draws"] += 1 with open("data_file.json", "w") as write_file: json.dump(data, write_file) return response else: return "Неверный аргумент
Использование команды:
!game *камень/ножницы/бумага/статистика*" def get_info_user(self, id): try: user_info = vk.method('users.get', {'user_ids': id, 'fields': 'verified,last_seen,sex'}) except vk_api.exceptions.ApiError as lol: err = "Ошибка метода users.get: " + str(lol) log(True, err) return "Пользователь не найден
" + str(lol) if "deactivated" in user_info[0]: if user_info[0]['deactivated'] == 'banned': return user_info[0]['first_name'] + " " + user_info[0]['last_name'] + " забанен" elif user_info[0]['deactivated'] == 'deleted': return "Профиль был удалён" if user_info[0]['is_closed']: is_closed = "Да" else: is_closed = "Нет" if user_info[0]['sex'] == 1: sex = "Женский" elif user_info[0]['sex'] == 2: sex = "Мужской" else: sex = "Неизвестно" if user_info[0]['last_seen']['platform'] == 1: platform = "m.vk.com" elif user_info[0]['last_seen']['platform'] == 2: platform = "iPhone" elif user_info[0]['last_seen']['platform'] == 3: platform = "iPad" elif user_info[0]['last_seen']['platform'] == 4: platform = "Android" elif user_info[0]['last_seen']['platform'] == 5: platform = "Windows Phone" elif user_info[0]['last_seen']['platform'] == 6: platform = "Windows 10" elif user_info[0]['last_seen']['platform'] == 7: platform = "vk.com" else: platform = "тип платформы неизвестен" time = datetime.datetime.fromtimestamp(user_info[0]['last_seen']['time']) answer = user_info[0]['first_name'] + " " + user_info[0]['last_name'] + "
Его ид: " + \ str(user_info[0]['id']) + "
Профиль закрыт: " + is_closed + "
Пол: " + sex \ + "
Последний онлайн: " + time.strftime('%d.%m.%Y в %H:%M:%S') + " (" + platform + ")" return answer def get_info_group(self, id): try: group_info = vk.method('groups.getById', {'group_id': id, 'fields': 'description,members_count'}) except vk_api.exceptions.ApiError as lol: err = "Ошибка метода groups.getById: " + str(lol) log(True, err) return "Группа не найдена
" + str(lol) info = "Результат метода API groups.getById: " + str(group_info) log(False, info) if group_info[0]['description'] == "": description = "Отсутствует" else: description = group_info[0]['description'] answer = group_info[0]['name'] + "
Описание: " + description + "
Ид группы: " + str( group_info[0]['id']) + "
Подписчиков: " + str(group_info[0]['members_count']) return answer def random_image(self): group = "-" + str(group_id) random_images_query = vk_mda.method('photos.get', {'owner_id': group, 'album_id': album_for_command, 'count': 1000}) info = "Результат метода photos.get: Получено " + str(random_images_query['count']) + " фото" log(False, info) random_number = random.randrange(random_images_query['count']) return "photo" + str(random_images_query['items'][random_number]['owner_id']) + "_" + str( random_images_query['items'][random_number]['id']) def wiki_article(self, search): w = wiki.Wikipedia('ru') page = w.page(search) if page.exists(): answer = page.title + "
" + page.summary else: answer = "Такой статьи не существует" return answer def exchange_rates(self): try: rates_USD = json.loads( requests.get("https://www.nbrb.by/api/exrates/rates/145?periodicity=0", timeout=10).text) rates_EUR = json.loads( requests.get("https://www.nbrb.by/api/exrates/rates/292?periodicity=0", timeout=10).text) rates_RUB = json.loads( requests.get("https://www.nbrb.by/api/exrates/rates/298?periodicity=0", timeout=10).text) return "Текущий курс валют по данным НБ РБ:
" + rates_USD['Cur_Name'] + ": " + str( rates_USD['Cur_Scale']) + " " + rates_USD['Cur_Abbreviation'] + " = " + str( rates_USD['Cur_OfficialRate']) + " BYN
" + rates_EUR['Cur_Name'] + ": " + str( rates_EUR['Cur_Scale']) + " " + rates_EUR['Cur_Abbreviation'] + " = " + str( rates_EUR['Cur_OfficialRate']) + " BYN
" + "Российский рубль" + ": " + str( rates_RUB['Cur_Scale']) + " " + rates_RUB['Cur_Abbreviation'] + " = " + str( rates_RUB['Cur_OfficialRate']) + " BYN" except Exception as mda: err = "Ошибка получения данных из НБ РБ API: " + str(mda) log(True, err) return "Невозможно получить данные из НБ РБ: " + str(mda) def send(self, message=None, attachment=None): message = vk.method('messages.send', {'peer_id': self._CHAT_ID, 'message': message, 'random_id': time.time(), 'attachment': attachment}) log(False, f'Бот id{self._CHAT_ID}: Ответ метода ВК "messages.send": {message}') def bots(): log(False, "Бот начал работу") debug_array['start_time'] = time.time() for event in MyVkLongPoll.listen(longpoll): try: if event.type == VkBotEventType.MESSAGE_NEW: log(False, f'Новое сообщение: {event.message}') debug_array['messages_get'] += 1 if int(event.message.peer_id) in bot: bot[event.message.peer_id].get_message(event.message.text, event.message.from_id) else: bot[event.message.peer_id] = VkBot(event.message.peer_id) users[event.message.peer_id] = {"midnight": False} update_users_json(users) bot[event.message.peer_id].get_message(event.message.text, event.message.from_id) except Exception as kek: err = "Беды с ботом: " + str(kek) log(True, err) continue def midnight(): while True: current_time = time.time()+10800 if int(current_time) % 86400 == 0: log(False, "Иницаилизация ивента \"Миднайт\"") for i in users: bot[int(i)].event("midnight") log(False, "Ивент \"Миднайт\" завершён") time.sleep(1) else: time.sleep(0.50) log(False, "Скрипт запущен, чтение users.json для восстановления обьектов ботов") load_users() tread_bots = threading.Thread(target=bots) tread_midnight = threading.Thread(target=midnight) tread_bots.start() tread_midnight.start()