Traffic Cardinal Traffic Cardinal написал 17.09.2024

Создание бота для уникализации рассылок в Telegram

Traffic Cardinal Traffic Cardinal написал 17.09.2024
19 мин
0
1298
Содержание

Главная проблема любой рассылки — ее однообразность. Во-первых, сам факт попадания в рассылку уже раздражает получателей, потому что наш Inbox и так ломится от рекламы. И можно было бы на это забить, но камон, хейт в конверсию стабильно превращает только Е. Ю. Во-вторых, массовая рассылка одного и того же сообщения приводит к быстрой блокировке. Чтобы этого избежать, достаточно сделать бот для уникализации рассылок. И да, сам бот делается очень просто, а вот интерфейс…

banner banner

Какие задачи решает бот для рассылки сообщений

Очевидно, что основная задача данного бота — уникализировать рассылки. Однако кроме этого, он также может быть использован для:

  • Повышения траста со стороны платформы — чем реже исходящие сообщения будут повторяться, тем меньше вероятность получить бан.

  • Повышения траста со стороны юзера — а здесь уже многое зависит от того, насколько творчески владелец бота подойдет к созданию своей воронки. Самое базовое — к одному и тому же пользователю можно обращаться разными формами его имени. Но концепт этим не ограничивается.

  • Интерактивной генерации контента — переделав часть, отвечающую за рассылку в ЛС, на код, который просто выводил бы сообщения в сам бот, можно сделать генератор псевдоуникального контента. Это можно использовать в сюжетных играх, агрегаторах гороскопов, анекдотов, предсказаний. Да в чем угодно, в общем-то. Тут все упирается лишь в вашу фантазию.

Впрочем, хватит лирики — перейдем к техничке.

Принцип работы бота для рассылки сообщений в Telegram

Алгоритм непосредственно программной части бота достаточно прост: все сводится к псевдослучайному жонглированию заранее заданными фрагментами текста и к контролю уникальности: бот следит за тем, чтобы комбинации не повторялись. Однако из-за особенностей Telegram интерфейс бота ощутимо раздувает код, а следовательно, и нагрузку на мозг изучающего код раз эдак в 5–10. Впрочем, кнопка есть кнопка.

Что же касается алгоритма, суть в следующем:

  1. Считывание лога, чтобы избежать дублирования.
  2. Считывание списка юзеров.
  3. Считывание списка текстов.
  4. Если в тексте есть плейсхолдер для %%обращения%% или &&фрагмента&&, считывает списки обращений или фрагментов соответственно.
  5. Случайным образом выбирает что-то из этих списков.
  6. Сверяется с логом — если комбинация уже была использована, генерирует уникальную.
  7. Записывает выбор в лог.
  8. Соединяет сгенерированный вариант текста, его отдельных фрагментов и обращения к пользователю между собой.
  9. Отсылает все это сообщением одному юзеру, повторяет все заново для следующего. И так пока список юзеров не закончится.
  10. Опционально боту может задаваться лайт- и хард-режим.
  11. В лайте он допускает всевозможные комбинации текста с фрагментами (полезно при массовой рассылке).
  12. В харде он все еще случайным образом подставляет в текст фрагменты, но сам шаблон текста использует лишь 1 раз (полезно при небольшой рассылке).
  13. Реагирует на сигналы элементов управления из меню.

Как видите, в целом ничего сложного нет. Но ох уж эти ваши кнопки Telegram… Сейчас увидите, сколько строк кода нужно ради простенького меню, и все поймете…

Пошаговая инструкция, как создать бота для рассылки сообщений в Telegram

Учитывая, что данный бот использует как user_api, так и bot_api, для его функционирования потребуется получение api_id, api_hash и bot_token. Чтобы это сделать, необходимо:

  1. Авторизоваться в веб-версии.
  2. Перейти в «API development tools» и заполнить форму.
  3. Скопировать api_id и api_hash.
  4. Затем нужно написать в ЛС BotFather и получить токен.
    После нужно настроить сервер. Если вы делаете это впервые, советуем изучить этот материал.
  5. Затем вводим в консоль сервера д:
    pip install asyncio
    pip install telethon

  6. После этого создаем на сервере файл .py с любым названием. Пусть это будет msg.py, добавляем в него следующее:
    import os
    from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkupfrom telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes
    import json
    import random
    from itertools import product
    from telethon.sync import TelegramClient
    import asyncio
    TOKEN = '
    ВАШ ТОКЕН БОТА'
    API_ID = '
    ВАШ API_ID'
    API_HASH = '
    ВАШ API_HASH'
    PHONE_NUMBER = '
    ВАШ ТЕЛЕФОН'
    SESSION_FILE = 'session_file.session'
    FRAGMENTS_DIR = 'fragments'
    FOLDER_PATH = 'fragments'
    user_states = {}
    current_user_id = None
    current_file_name = None
    def load_json(filename):
    try:
    with open(filename, 'r', encoding='utf-8') as file:
    data = json.load(file)
    return data
    except json.JSONDecodeError as e:
    print(f"Ошибка декодирования JSON в файле {filename}: {e}")
    except FileNotFoundError:
    print(f"Файл {filename} не найден.")
    except Exception as e:
    print(f"Неожиданная ошибка при загрузке {filename}: {e}")
    return None
    def save_json(data, filename):
    with open(filename, 'w') as file:
    json.dump(data, file, indent=4)
    def generate_combinations(text, fragments):
    flags = [f'&&{flag}&&' for flag in fragments.keys()]
    combinations = []
    flag_keys = []
    for flag in flags:
    if flag in text:
    replacement_texts = [(frag['TEXT'], frag['TEXT']) for frag in fragments[flag[2:-2]]]
    combinations.append(replacement_texts)
    flag_keys.append(flag[2:-2])
    else:
    combinations.append([('', None)])
    flag_keys.append(None)
    all_combos = list(product(*combinations))
    return all_combos, flag_keys
    def substitute_text(text, replacements):
    for key, value in replacements.items():
    text = text.replace(f'%%{key}%%', value)
    return text
    def is_combination_used(user_control, text_id, flag_combination, mode):
    if mode == 1:
    for entry in user_control:
    if entry['text_id'] == text_id and entry['flags'] == flag_combination:
    return True
    elif mode == 0:
    for entry in user_control:
    if entry['text_id'] == text_id:
    return True
    return False
    async def process_texts(users, texts, fragments, control, mode):
    new_control = control.copy()
    all_possible_combinations = []
    for text_data in texts:
    text_id = text_data['ID']
    text = text_data['TEXT']
    all_combos, flag_keys = generate_combinations(text, fragments)
    for combo in all_combos:
    flag_combination = {flag: str(frag[1]) for flag, frag in zip(flag_keys, combo) if flag}
    all_possible_combinations.append((text_id, flag_combination))
    random.shuffle(all_possible_combinations)
    async with TelegramClient(SESSION_FILE, API_ID, API_HASH) as client:
    if not await client.is_user_authorized():
    await client.send_code_request(PHONE_NUMBER)
    await client.sign_in(PHONE_NUMBER, input('Введите код из СМС: '))
    for user in users:
    user_id = user['ID']
    if str(user_id) not in new_control:
    new_control[str(user_id)] = []
    user_combinations = new_control[str(user_id)]
    available_combinations = [combo for combo in all_possible_combinations if not is_combination_used(user_combinations, *combo, mode)]
    if available_combinations:
    text_id, flag_combination = random.choice(available_combinations)
    if user['NAMES']:
    name_entry = random.choice(user['NAMES'])
    name_id = name_entry['ID']
    name = name_entry['name']
    else:
    name_id = None
    name = ""
    final_text = next(text_data['TEXT'] for text_data in texts if text_data['ID'] == text_id)
    for flag, replacement in flag_combination.items():
    final_text = final_text.replace(f'&&{flag}&&', str(replacement))
    final_text = substitute_text(final_text, {"name": name})
    user_combinations.append({
    "text_id": text_id,
    "names_id": name_id,
    "flags": flag_combination
    })
    try:
    await client.send_message(user['LINK'], final_text)
    print(f"Сообщение отправлено пользователю с ID {user_id} ({user['LINK']}): {final_text}")
    except Exception as e:
    print(f"Ошибка при отправке сообщения пользователю с ID {user_id} ({user['LINK']}): {str(e)}")
    else:
    print(f"Все комбинации использованы для пользователя {user_id}")
    return new_control
    async def run_go():
    random.seed()
    mode = 1
    users = load_json('users.json')
    texts = load_json('texts.json')
    fragments = {os.path.splitext(f)[0]: load_json(os.path.join(FOLDER_PATH, f)) for f in os.listdir(FOLDER_PATH)}
    control = load_json('control.json') if os.path.exists('control.json') else {}
    updated_control = await process_texts(users, texts, fragments, control, mode)
    save_json(updated_control, 'control.json')
    def go():
    loop = asyncio.get_event_loop()
    if loop.is_running():
    asyncio.ensure_future(run_go())
    else:
    loop.run_until_complete(run_go())
    def load_data(filename):
    with open(filename, 'r') as file:
    return json.load(file)
    def save_data(filename, data):
    with open(filename, 'w') as file:
    json.dump(data, file, indent=4)
    def list_fragment_files():
    return [f for f in os.listdir(FRAGMENTS_DIR) if f.endswith('.json')]
    def build_keyboard(data, level=1, delete_enabled=False):
    keyboard = []
    if level == 1:
    keyboard.append([InlineKeyboardButton("Пользователи", callback_data='users')])
    keyboard.append([InlineKeyboardButton("Тексты", callback_data='texts')])
    keyboard.append([InlineKeyboardButton("Фрагменты", callback_data='fragments')])
    keyboard.append([InlineKeyboardButton("GO", callback_data='go')])
    keyboard.append([InlineKeyboardButton("Mode", callback_data='mode')])
    elif level == 2:
    for item in data:
    keyboard.append([InlineKeyboardButton(item['LINK'], callback_data=f"user_{item['ID']}"),
    InlineKeyboardButton("Удалить", callback_data=f"delete_user_{item['ID']}")])
    keyboard.append([InlineKeyboardButton("Добавить", callback_data='add_user')])
    elif level == 3:
    for name in data:
    keyboard.append([InlineKeyboardButton(name['name'], callback_data=f"user_name_{name['ID']}"),
    InlineKeyboardButton("Удалить", callback_data=f"delete_name_{name['ID']}")])
    keyboard.append([InlineKeyboardButton("Добавить", callback_data='add_name')])
    elif level == 4:
    for item in data:
    keyboard.append([InlineKeyboardButton(item['TITLE'], callback_data=f"text_{item['ID']}"),
    InlineKeyboardButton("Удалить", callback_data=f"delete_text_{item['ID']}")])
    keyboard.append([InlineKeyboardButton("Добавить", callback_data='add_text')])
    elif level == 5:
    for file_name in data:
    keyboard.append([InlineKeyboardButton(file_name, callback_data=f"fragment_file_{file_name}"),
    InlineKeyboardButton("Удалить", callback_data=f"delete_file_{file_name}")])
    keyboard.append([InlineKeyboardButton("Добавить", callback_data='add_file')])
    elif level == 6:
    for item in data:
    keyboard.append([InlineKeyboardButton(item['TEXT'], callback_data=f"fragment_text_{item['ID']}"),
    InlineKeyboardButton("Удалить", callback_data=f"delete_fragment_{item['ID']}")])
    keyboard.append([InlineKeyboardButton("Добавить", callback_data='add_fragment')])
    return InlineKeyboardMarkup(keyboard)
    async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    user_states.pop(update.message.from_user.id, None)
    await update.message.reply_text(
    'Добро пожаловать! Выберите раздел:',
    reply_markup=build_keyboard([], level=1)
    )
    def mode():
    with open('mode.json', 'r') as file:
    data = json.load(file)
    data['mode'] = 1 - data['mode']
    with open('mode.json', 'w') as file:
    json.dump(data, file, indent=4)
    async def button(update: Update, context: ContextTypes.DEFAULT_TYPE):
    global current_user_id
    global current_file_name
    query = update.callback_query
    await query.answer()
    user_states.pop(query.from_user.id, None)
    try:
    if query.data == 'go':
    go()
    if query.data == 'mode':
    mode()
    elif query.data == 'users':
    data = load_data('users.json')
    await query.edit_message_text(
    text="Выберите пользователя:",
    reply_markup=build_keyboard(data, level=2, delete_enabled=True)
    )
    elif query.data.startswith('user_'):
    user_id = int(query.data.split('_')[1])
    data = load_data('users.json')
    user = next((item for item in data if item['ID'] == user_id), None)
    if user:
    await query.edit_message_text(
    text=f"Пользователь: {user['LINK']}\nВыберите имя:",
    reply_markup=build_keyboard(user['NAMES'], level=3, delete_enabled=True)
    )
    current_user_id=user['ID']
    else:
    await query.edit_message_text(text="Пользователь не найден.")
    elif query.data.startswith('user_name_'):
    name_id = int(query.data.split('_')[2])
    data = load_data('users.json')
    name = next((item for sublist in [d['NAMES'] for d in data] for item in sublist if item['ID'] == name_id), None)
    if name:
    await query.edit_message_text(text=f"Вы выбрали: {name['name']}")
    else:
    await query.edit_message_text(text="Имя не найдено.")
    elif query.data == 'texts':
    data = load_data('texts.json')
    await query.edit_message_text(
    text="Выберите текст:",
    reply_markup=build_keyboard(data, level=4, delete_enabled=True)
    )
    elif query.data.startswith('text_'):
    text_id = int(query.data.split('_')[1])
    data = load_data('texts.json')
    text = next((item for item in data if item['ID'] == text_id), None)
    if text:
    await query.edit_message_text(text=text['TEXT'])
    else:
    await query.edit_message_text(text="Текст не найден.")
    elif query.data == 'fragments':
    files = list_fragment_files()
    await query.edit_message_text(
    text="Выберите файл фрагментов:",
    reply_markup=build_keyboard(files, level=5, delete_enabled=True)
    )
    elif query.data.startswith('fragment_file_'):
    file_name = query.data.split('fragment_file_')[1]
    data = load_data(os.path.join(FRAGMENTS_DIR, file_name))
    current_file_name = file_name
    await query.edit_message_text(
    text="Выберите фрагмент:",
    reply_markup=build_keyboard(data, level=6, delete_enabled=True)
    )
    elif query.data.startswith('fragment_text_'):
    fragment_id = int(query.data.split('_')[2])
    file_name = query.data.split('_')[1]
    data = load_data(os.path.join(FRAGMENTS_DIR, file_name))
    fragment = next((item for item in data if item['ID'] == fragment_id), None)
    if fragment:
    await query.edit_message_text(text=fragment['TEXT'])
    else:
    await query.edit_message_text(text="Фрагмент не найден.")
    elif query.data.startswith('delete_'):
    if query.data.startswith('delete_user_'):
    user_id = int(query.data.split('_')[2])
    data = load_data('users.json')
    data = [item for item in data if item['ID'] != user_id]
    save_data('users.json', data)
    await query.edit_message_text(
    text="Пользователь удален.",
    reply_markup=build_keyboard(data, level=2, delete_enabled=True)
    )
    elif query.data.startswith('delete_name_'):
    name_id = int(query.data.split('_')[2])
    data = load_data('users.json')
    for user in data:
    user['NAMES'] = [name for name in user['NAMES'] if name['ID'] != name_id]
    save_data('users.json', data)и
    await query.edit_message_text(
    text="Имя удалено.",
    )
    user_id = int(query.data.split('_')[2])
    print(user_id)
    elif query.data.startswith('delete_text_'):
    text_id = int(query.data.split('_')[2])
    data = load_data('texts.json')
    data = [item for item in data if item['ID'] != text_id]
    save_data('texts.json', data)
    await query.edit_message_text(
    text="Текст удален.",
    reply_markup=build_keyboard(data, level=4, delete_enabled=True)
    )
    elif query.data.startswith('delete_file_'):
    file_name = query.data.split('delete_file_')[1]
    file_path = os.path.join(FRAGMENTS_DIR, file_name)
    if os.path.exists(file_path):
    os.remove(file_path)
    await query.edit_message_text(
    text="Файл удален.",
    reply_markup=build_keyboard(list_fragment_files(), level=5, delete_enabled=True)
    )
    else:
    await query.edit_message_text(text="Файл не найден.")
    elif query.data.startswith('delete_fragment_'):
    fragment_id = int(query.data.split('_')[2])
    file_name = query.data.split('_')[1]
    file_path = os.path.join(FRAGMENTS_DIR, file_name)
    data = load_data(file_path)
    data = [item for item in data if item['ID'] != fragment_id]
    save_data(file_path, data)
    await query.edit_message_text(
    text="Фрагмент удален.",
    reply_markup=build_keyboard(data, level=6, delete_enabled=True)
    )
    elif query.data.startswith('add_'):
    user_states[query.from_user.id] = query.data
    if query.data == 'add_user':
    await query.edit_message_text(text="Добавление пользователя: пришлите ссылку на профиль")
    elif query.data == 'add_name':
    await query.edit_message_text(text="Добавление обращения: введите обращение к пользователю")
    elif query.data == 'add_text':
    await query.edit_message_text(text="Добавление основного текста: введите текст")
    elif query.data == 'add_file':
    await query.edit_message_text(text="Создание фрагмента: укажите название файла")
    elif query.data == 'add_fragment':
    await query.edit_message_text(text="Создание фрагмента: введите текст фрагмента")
    else:
    await query.edit_message_text(text="Рассылка выполнена")
    except Exception as e:
    await query.edit_message_text(text=f"Произошла ошибка: {str(e)}")
    print(f"Error: {str(e)}")
    async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
    global current_user_id
    global current_file_name
    user_id = update.message.from_user.id
    if user_id in user_states:
    action = user_states[user_id]
    user_states.pop(user_id)
    if action == 'add_user':
    user_link = update.message.text.strip()
    if user_link.startswith("https://t.me/"):
    data = load_data('users.json')
    new_id = max(user['ID'] for user in data) + 1 if data else 1
    new_user = {
    "ID": new_id,
    "LINK": user_link,
    "NAMES": []
    }
    data.append(new_user)
    save_data('users.json', data)
    await update.message.reply_text(f"Новый пользователь добавлен: {user_link}")
    else:
    await update.message.reply_text("Укажите ссылку в формате https://t.me/имя")
    elif action == 'add_name':
    user_id = current_user_id
    new_name = update.message.text.strip()
    data = load_data('users.json')
    user = next((u for u in data if u['ID'] == user_id), None)
    if user:
    new_name_id = max([name['ID'] for name in user['NAMES']], default=0) + 1
    user['NAMES'].append({"ID": new_name_id, "name": new_name})
    save_data('users.json', data)
    else:
    print(f"Пользователь с ID {user_id} не найден.")
    await update.message.reply_text(f"Сообщение получено для добавления в Имена пользователей: {update.message.text}")
    elif action == 'add_text':
    await update.message.reply_text("Добавление основного текста: введите заголовок")
    user_states[user_id] = 'waiting_for_text'
    context.user_data['new_text_content'] = update.message.text.strip()
    return
    elif action == 'waiting_for_text':
    new_text = context.user_data.get('new_text_content', '')
    new_title = update.message.text.strip()
    data = load_data('texts.json')
    new_id = max((item['ID'] for item in data), default=0) + 1
    new_text_entry = {
    "ID": new_id,
    "TITLE": new_title,
    "TEXT": new_text
    }
    data.append(new_text_entry)
    save_data('texts.json', data)
    await update.message.reply_text(f"Новый текст добавлен: {new_title}\n\n{new_text}")
    del context.user_data['new_text_content']
    elif action == 'add_file':
    file_name = update.message.text.strip()
    if not file_name.endswith('.json'):
    file_name += '.json'
    file_path = f"./fragments/{file_name}"
    with open(file_path, 'w') as json_file:
    json.dump([], json_file)
    await update.message.reply_text(f"Файл {file_name} создан.")
    elif action == 'add_fragment':
    fragment_text = update.message.text.strip()
    file_name = current_file_name
    print (file_name)
    if not file_name:
    await update.message.reply_text("Ошибка: не найден файл для добавления фрагмента.")
    return
    file_path = f"./fragments/{file_name}"
    if not os.path.exists(file_path):
    await update.message.reply_text("Ошибка: файл не найден. Пожалуйста, создайте файл сначала.")
    return
    with open(file_path, 'r') as json_file:
    try:
    fragments = json.load(json_file)
    except json.JSONDecodeError:
    fragments = []
    new_id = max((fragment['ID'] for fragment in fragments), default=0) + 1
    new_fragment = {
    "ID": new_id,
    "TEXT": fragment_text
    }
    fragments.append(new_fragment)
    with open(file_path, 'w') as json_file:
    json.dump(fragments, json_file, indent=4)
    await update.message.reply_text(f"Новый фрагмент добавлен:\nID: {new_id}\nTEXT: {fragment_text}")
    def main():
    application = Application.builder().token(TOKEN).build()
    application.add_handler(CommandHandler('start', start))
    application.add_handler(CallbackQueryHandler(button))
    application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
    application.run_polling()
    if __name__ == '__main__':
    main()
  7. Заменяем 'ВАШ ТОКЕН БОТА', 'ВАШ API_ID', 'ВАШ API_HASH' и 'ВАШ ТЕЛЕФОН' — на свои api_id, api_hash, номер и сообщение соответственно.
  8. Сохраняем файл.
  9. Инициируем бота вводом:
    python bot.py.
  10. Авторизуем бота в Телеге. Вводим номер при появлении «Please enter your phone (or bot token)». Затем — код, который придет в Телегу.

Готово. Дальнейшая работа с ботом осуществляется уже через его интерфейс и не требует использования консоли.

Демонстрация работы

Процесс добавления пользователя
Процесс добавления пользователя

Процесс создания текста с вариантами обращения и двумя вариативными фрагментами
Процесс создания текста с вариантами обращения и двумя вариативными фрагментами

Процесс удаления текста
Процесс удаления текста

Процесс создания фрагментов текста
Процесс создания фрагментов текста

Процесс удаления фрагментов текста
Процесс удаления фрагментов текста

Процесс добавления вариантов обращения
Процесс добавления вариантов обращения

Процесс создания вариантов фрагментов
Процесс создания вариантов фрагментов

Процесс создания вариантов фрагментов
Процесс создания вариантов фрагментов

Запуск рассылки
Запуск рассылки

Демонстрация рассылки
Демонстрация рассылки

Обратите внимание, что в примерах для упрощения понимания использован текст-болванка и нет пауз между рассылками. Разумеется, при реальной рассылке так делать не нужно. Текст должен быть трастовым, обращения уместными, фрагменты согласованы с содержимым текста, а между рассылками должны быть паузы.

Как видите, уникализировать рассылку с помощью Telegram-бота не так уж и сложно. Куда сложнее подвязать ко всему этому делу интерфейс для удобства. А на этом у нас все. Следите за нашим Telegram-каналом, чтобы первыми получать код арбитражных ботов.

Здравствуйте! У вас включен блокировщик рекламы, часть сайта не будет работать!