import logging import os import time from datetime import datetime from aiogram.dispatcher import FSMContext from aiogram.types import CallbackQuery, InlineKeyboardMarkup, KeyboardButton from data.config import HIDE_TITLE from keyboards.default.cancel import cancel from keyboards.default.register_kb import register_kb from keyboards.inline.dialog_calendar import DialogCalendar, calendar_callback from keyboards.inline.edit import edit_keyboard from keyboards.inline.event import subscribe_keyboard from loader import dp from aiogram.utils.markdown import escape_md from states.states import EventState from utils.create_xlsx import create_table from utils.misc.throttling import rate_limit from utils.mongo.events_class import Events from utils.mongo.user_class import User from .events import print_events # async def check_event(data, admin: bool = False): # """Проверка существования мероприятия""" # event = await Events.get_one(data) # print('event', event.get('status'), admin) # # Если такое мероприятие есть и у него статус True # if not event: # return False # elif event.get('status'): # return False # elif not event.get('status'): # если скрыто # if admin: # return True # # # Если открыто или админ # # if event.get('status') : # return False @rate_limit(3) @dp.callback_query_handler(text_startswith='delete_') async def delete_callback_handler(query: CallbackQuery): '''Если нажата кнопка удаления''' _id: str = query.data.split('_')[1] user = await User(user_id=query.message.chat.id).get_info() if not user: return await query.message.answer(text='Вы не зарегистрированны\!', reply_markup=register_kb) if not user.get('is_admin'): return await query.answer('У вас недостаточно прав!') event: bool | dict = await Events.get_one(_id) delete: bool = await Events.delete(_id=_id) if event and delete: await query.answer('Мероприятие удалено!') await query.message.delete() else: return await query.answer('Ошибка!') @rate_limit(3) @dp.callback_query_handler(text_startswith='view_participants') async def view_participants_handler(query: CallbackQuery): '''Участники мероприятия''' _id: str = query.data.split('_')[2] user = await User(user_id=query.message.chat.id).get_info() if not user: return await query.message.answer(text='Вы не зарегистрированны\!', reply_markup=register_kb) if not user.get('is_admin'): return await query.answer('У вас недостаточно прав!') event: bool | dict = await Events.get_one(_id) if not event: return await query.answer('Ошибка!') if not event.get('participants'): return await query.answer('Ещё нет участников!') file_name = f"{datetime.utcfromtimestamp(event.get('timestamp')).strftime('%Y-%m-%d')}_{event.get('title').replace(' ', '_')}" file_path = os.path.join('file', f'{file_name}.xlsx') # f'file\\{file_name}.xlsx' await create_table(file_path=file_path, participants=event.get('participants')) await query.message.answer_document(open(file_path, 'rb')) os.remove(file_path) await query.answer('') @rate_limit(3) @dp.callback_query_handler(text_startswith='hide_') async def hide_callback_handler(query: CallbackQuery): '''Спрятать мероприятие''' _id = query.data.split('_')[1] user = await User(user_id=query.message.chat.id).get_info() if not user: return await query.message.answer(text='Вы не зарегистрированны\!', reply_markup=register_kb) if not user.get('is_admin'): return await query.answer('У вас недостаточно прав!') event: bool | dict = await Events.get_one(_id) hide: bool = await Events.hide_or_show(_id=_id, type=0) if event and hide: await query.answer('Мероприятие спрятано!') await query.message.edit_caption(caption=HIDE_TITLE+escape_md(query.message.caption)) return await query.message.edit_reply_markup(reply_markup=edit_keyboard(_id=_id, status=0)) return await query.answer('Ошибка!') @rate_limit(3) @dp.callback_query_handler(text_startswith='show_') async def show_callback_handler(query: CallbackQuery): '''Показать мероприятие''' _id = query.data.split('_')[1] user = await User(user_id=query.message.chat.id).get_info() if not user: return await query.message.answer(text='Вы не зарегистрированны\!', reply_markup=register_kb) if not user.get('is_admin'): return await query.answer('У вас недостаточно прав!') event: bool | dict = await Events.get_one(_id) show: bool = await Events.hide_or_show(_id=_id, type=1) if event and show: await query.answer('Мероприятие доступно для всех!') await query.message.edit_caption(caption=escape_md(query.message.caption.replace(HIDE_TITLE, ''))) return await query.message.edit_reply_markup(reply_markup=edit_keyboard(_id=_id, status=1)) return await query.answer('Ошибка!') @rate_limit(3) @dp.callback_query_handler(text_startswith='event_') async def event_callback_handler(query: CallbackQuery): '''Показать мероприятие по id''' user = await User(user_id=query.message.chat.id).get_info() if not user: return await query.message.answer(text='Вы не зарегистрированны\!', reply_markup=register_kb) query_data = query.data.split('_') _id = query_data[1] event = await Events.get_one(_id) if not event: return await query.answer('Ошибка!') await query.answer(' ') await print_events( message=query, events=[event], edit=True if query_data[-1] == 'edit' else False, is_admin=user.get('is_admin') ) @rate_limit(3) @dp.callback_query_handler(text_startswith='my_') async def my_callback_handler(query: CallbackQuery): '''Вывести месяца в которых были мероприятия''' user = await User(user_id=query.message.chat.id).get_info() if not user: return await query.message.answer(text='Вы не зарегистрированны\!', reply_markup=register_kb) if not user.get('is_admin'): return await query.answer('У вас недостаточно прав!') all_my_events = await Events.get_all_my( user_id=query.message.chat.id, is_admin=user.get('is_admin') ) # if not all_my_events: TODO # return await query.message.answer('До этого мероприятий не было\!') all_my_events_dict = {} # Сортируем по годам for event in all_my_events: event_datetime = datetime.utcfromtimestamp(event.get('timestamp')) year = event_datetime.year try: all_my_events_dict[year].append(event) except: all_my_events_dict[year] = [event] await query.answer(' ') for event_year, events in all_my_events_dict.items(): keyboard = InlineKeyboardMarkup() for event in events: keyboard.row(KeyboardButton(event.get('title'), callback_data=f'event_{event.get("_id")}')) await query.message.answer(f'*{event_year}*', reply_markup=keyboard) @rate_limit(3) @dp.callback_query_handler(text_startswith='else_') async def else_callback_handler(query: CallbackQuery): '''Вывести года и мероприятия в них''' user = await User(user_id=query.message.chat.id).get_info() if not user: return await query.message.answer(text='Вы не зарегистрированны\!', reply_markup=register_kb) if not user.get('is_admin'): return await query.answer('У вас недостаточно прав!') all_events = await Events.get_all(is_admin=user.get('is_admin')) if not all_events: return await query.message.answer('До этого мероприятий не было\!') else_events_dict = {} # Сортируем по годам for event in all_events: event_datetime = datetime.utcfromtimestamp(event.get('timestamp')) year = event_datetime.year try: else_events_dict[year].append(event) except: else_events_dict[year] = [event] await query.answer(' ') for event_year, events in else_events_dict.items(): keyboard = InlineKeyboardMarkup() for event in events: keyboard.row() keyboard.insert(KeyboardButton(event.get('title'), callback_data=f'event_{event.get("_id")}{"_edit" if query.data.split("_")[-1] == "edit" else ""}')) await query.message.answer(f'*{event_year}*', reply_markup=keyboard) @rate_limit(3) @dp.callback_query_handler(lambda call: len(call.data) == 24) # Подходит только id мероприятия async def id_callback_handler(query: CallbackQuery): answer_data = query.data # Проверяем существование пользователя user = await User(user_id=query.message.chat.id).get_info() if not user: return await query.message.answer(text='Вы не зарегистрированны\!', reply_markup=register_kb) event = await Events.get_one(answer_data) if not event: return await query.answer('Такого мероприятие уже нет или оно не доступно!') ################################ # Если мероприятие уже прошло if event.get('timestamp') < int(time.time()): return await query.answer('Мероприятие уже прошло!') # Отписываемся if query.message.chat.id in event.get('participants'): await Events.delete_participant(user_id=query.message.chat.id, event_id=event.get('_id')) await query.answer('Вы больше не участник!') my_updated_event = await Events.get_one(event.get('_id')) return await query.message.edit_reply_markup(reply_markup=subscribe_keyboard( participants=len(my_updated_event.get('participants')), places=my_updated_event.get('places'), _id=str(my_updated_event.get('_id')), closed=True if len(my_updated_event.get('participants')) == my_updated_event.get('places') else False, iam_participating=True if not query.from_user.id in event.get('participants') else False, is_admin=user.get('is_admin') )) ################################ # Проверяем свободные места if len(event.get('participants')) >= event.get('places'): return await query.answer('Места закончились!') # Подписываемся try: await Events.add_participant(user_id=query.message.chat.id, event_id=answer_data) await query.answer('Вы успешно зарегистрировались!') event = await Events.get_one(answer_data) return await query.message.edit_reply_markup(reply_markup=subscribe_keyboard( participants=len(event.get('participants')), places=event.get('places'), _id=str(event.get('_id')), closed=True if len(event.get('participants')) == event.get('places') else False, iam_participating=True if query.from_user.id in event.get('participants') else False, is_admin=user.get('is_admin') )) except Exception as e: logging.error(e) await query.answer('Что то пошло не так!') @rate_limit(3) @dp.callback_query_handler(calendar_callback.filter(), state='*') async def dialog_calendar(query: CallbackQuery, callback_data: dict, state: FSMContext): """Календарь""" selected, date = await DialogCalendar().process_selection(query, callback_data) if selected: async with state.proxy() as data: data['timestamp'] = int(date.timestamp()) await query.message.answer( 'Введи кол\-во участников мероприятия', reply_markup=cancel() ) await EventState.next()
Public
Copy-paste lead to errors very, very often because developers forget to change value on one of copy-pasted lines . You should avoid it as much as possible. Usually a good solution is to extract the things that differ into separate variables.
Better create a decorator which will check user permissions.
Seems like things could be organized in a better way.
Returning bool or dict is weird. Better to return dict or raise an Exception if anything goes wrong.
Whatever you want to code, usually someone has already done that. And usually there are some tools to cover your case. And usually you can find them using Google.
Just use built-in sort
and groupby
.
from itertools import groupby from operator import itemgetter for event in events: event['datetime'] = datetime.utcfromtimestamp(event.get('timestamp')) events.sort(itemgetter('datetime')) events_by_year = groupby(events, key=lambda event: event['datetime'].year) for year, events in events_by_year: ...
Whatever you want to code, usually someone has already done that. And usually there are some tools to cover your case. And usually you can find them using Google.
all_my_events_dict = defaultdict(list)
Exceptions should be easy to catch. If your code throws only Exception
or ValueError
, then it's very hard to catch specific errors, because all thrown exception classes are the same. Create application-specific exceptions, so that every logical error has its own exception class: class VerySpecificException(Exception): pass
Functions should be small - at least fit your screen's height. Otherwise they will be hard to read and hard to test. Try splitting big function into smaller ones.
Create new review request