Skip to content

Instantly share code, notes, and snippets.

@wahtherewahhere
Last active November 26, 2023 09:28
Show Gist options
  • Select an option

  • Save wahtherewahhere/77fd00de18be035fbe7d3d8886721750 to your computer and use it in GitHub Desktop.

Select an option

Save wahtherewahhere/77fd00de18be035fbe7d3d8886721750 to your computer and use it in GitHub Desktop.
LineBot+Python+FastAPI
'''
requirements:
uvicorn==0.24.0.post1
fastapi==0.104.1
line-bot-sdk==3.5.1
'''
import logging
import configparser
import uvicorn
from fastapi import FastAPI, Request, HTTPException, Response, BackgroundTasks
from linebot.v3 import (
WebhookHandler
)
from linebot.v3.exceptions import (
InvalidSignatureError
)
from linebot.v3.messaging import (
Configuration,
ApiClient,
MessagingApi,
ReplyMessageRequest,
TextMessage
)
from linebot.v3.webhooks import (
MessageEvent,
TextMessageContent
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
config = configparser.ConfigParser()
config.read('config.ini')
configuration = Configuration(access_token=config['line']['ACCESS_TOKEN'])
handler = WebhookHandler(config['line']['CHANNEL_SECRET'])
app = FastAPI()
@app.api_route("/callback", methods=['POST'])
async def callback(request: Request, background_tasks: BackgroundTasks):
# get X-Line-Signature header value
signature = request.headers['X-Line-Signature']
# get request body as text
body = (await request.body()).decode()
logger.info("Request Signature: " + signature)
logger.info("Request body: " + body)
background_tasks.add_task(proxy_to_line_handler, body=body, signature=signature)
return Response(content='OK', media_type="text/plain")
def proxy_to_line_handler(body: str, signature: str):
try:
handler.handle(body, signature)
except InvalidSignatureError:
logger.info("Invalid signature. Please check your channel access token/channel secret.")
@handler.add(MessageEvent, message=TextMessageContent)
def handle_message(event):
with ApiClient(configuration) as api_client:
logger.info("LINE message: " + event.message.text)
# handle message here
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message_with_http_info(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[TextMessage(text=event.message.text)]
)
)
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=10000, reload=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment