Created
May 18, 2021 09:29
-
-
Save null2264/e21f33bd5fee6bf6fa49b9f532bac61d to your computer and use it in GitHub Desktop.
YouTube Notification using Discord Bot (Cog)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import discord | |
| import xmltodict | |
| from aiohttp import web | |
| from discord.ext import commands | |
| # THIS CODE IS MADE FOR TESTING, | |
| # SOME EDIT IS REQUIRED FOR PRODUCTION USAGE | |
| # === HOW TO USE === | |
| # 1. Run the bot, then get your callback link (Default="http://0.0.0.0:5000/callback", where 0.0.0.0 is your public IP) | |
| # 2. Go to "https://pubsubhubbub.appspot.com/subscribe", put in callback link, topic url, and mode "subscribe" | |
| # Topic url example: | |
| # - "https://www.youtube.com/xml/feeds/videos.xml?channel_id=CHANNEL_ID" | |
| # - "https://www.youtube.com/xml/feeds/videos.xml?channel_id=UC322TuEMTaEZEDWmfnzn9nQ" | |
| class YouTube(commands.Cog): | |
| def __init__(self, bot): | |
| self.bot = bot | |
| self.bot.loop.create_task(self.webserver()) | |
| async def webserver(self): | |
| async def handler(request): | |
| return web.Response(text="Hello, world") | |
| async def callbackPost(request): | |
| data = await request.text() | |
| ytData = xmltodict.parse(data) | |
| channel = self.bot.get_channel(745481731582197780) | |
| await channel.send( | |
| "New video update!\n{}\nTHIS IS A TEST".format( | |
| ytData["feed"]["entry"]["link"]["@href"] | |
| ) | |
| ) | |
| return web.Response(text=str(ytData)) | |
| async def callbackGet(request): | |
| data = request.query | |
| return web.Response(text=data["hub.challenge"]) | |
| app = web.Application() | |
| app.add_routes( | |
| [ | |
| web.get("/", handler), | |
| web.post("/callback", callbackPost), | |
| web.get("/callback", callbackGet), | |
| ] | |
| ) | |
| runner = web.AppRunner(app) | |
| await runner.setup() | |
| self.site = web.TCPSite(runner, "0.0.0.0", 5000) | |
| await self.bot.wait_until_ready() | |
| await self.site.start() | |
| def __unload(self): | |
| asyncio.ensure_future(self.site.stop()) | |
| def setup(bot): | |
| bot.add_cog(YouTube(bot)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment