本文主要使用的技术栈:
主要实现功能:
通过websocket实时查看指定文件日志,类似tail -f 命令
一,channels安装及配置
参考官方文档:Installation — Channels 4.0.0 documentation
a.通过pip安装
python -m pip install -U channels["daphne"]
b.在django配置文件 INSTALLED_APPS
最开始添加 daphne
INSTALLED_APPS = ("daphne", # 一定放到第一位"django.contrib.auth","django.contrib.contenttypes","django.contrib.sessions","django.contrib.sites",...
)
c.创建一个 message 的app
python3 manage.py startapp message
该操作会创建一个 message的目录,文件如下
message/__init__.pyadmin.pyapps.pymigrations/__init__.pymodels.pytests.pyviews.py
将该app注册到settings里面
INSTALLED_APPS = ("daphne", # 一定放到第一位"message", # 刚才创建的app"django.contrib.auth","django.contrib.contenttypes","django.contrib.sessions","django.contrib.sites",...
)
d.新增路由文件 routing.py
from django.urls import re_pathfrom . import consumerswebsocket_urlpatterns = [re_path(r"ws/message/(?P[\w+|\-?]+)+/(?P\w+)$", consumers.RobotMessageConsumer.as_asgi()),
]
e.新增consumers文件consumers.py
import asyncio
import logging
import osimport aiofiles
from channels.generic.websocket import AsyncJsonWebsocketConsumerlogger = logging.getLogger(__name__)class RobotMessageConsumer(AsyncJsonWebsocketConsumer):def __init__(self, *args, **kwargs):super().__init__(args, kwargs)self.room_group_name = Noneself.disconnected = Trueasync def connect(self):username = self.scope["url_route"]["kwargs"].get('username')token = self.scope["url_route"]["kwargs"].get('token')if username == token:self.disconnected = Falseself.room_group_name = f"message_{username}"# Join room groupawait self.channel_layer.group_add(self.room_group_name, self.channel_name)await self.accept()else:logger.error(f"username:{username} token:{token} auth failed")await self.close()async def disconnect(self, close_code):self.disconnected = Trueif self.room_group_name:await self.channel_layer.group_discard(self.room_group_name, self.channel_name)# Receive message from WebSocketasync def receive_json(self, content, **kwargs):filepath = content.get('filepath')await self.async_handle_task(filepath)async def async_handle_task(self, filepath):while not self.disconnected:if not os.path.exists(filepath):await self.send_json({'message': '.', 'filepath': filepath})await asyncio.sleep(0.5)else:await self.send_task_log(filepath)breakasync def send_task_log(self, filepath):await self.send_json({'message': '\r\n'})try:logger.debug('file log path: {}'.format(filepath))async with aiofiles.open(filepath, 'rb') as log_f:await log_f.seek(0, os.SEEK_END)print(await log_f.tell())backup = min(4096 * 5, await log_f.tell())await log_f.seek(-backup, os.SEEK_END)while not self.disconnected:data = await log_f.read(4096)if data:data = data.replace(b'\n', b'\r\n')await self.send_json({'message': data.decode(errors='ignore'), 'filepath': filepath})await asyncio.sleep(0.2)except OSError as e:logger.warning('file log path open failed: {}'.format(e))
f.修改asgi.py,增加路由配置
import osfrom channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_applicationos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'server.settings')django_asgi_app = get_asgi_application()import message.routingapplication = ProtocolTypeRouter({"http": django_asgi_app,"websocket": AllowedHostsOriginValidator(AuthMiddlewareStack(URLRouter(message.routing.websocket_urlpatterns))),}
)
g.在settings.py中增加asgi 支持
ASGI_APPLICATION = "server.asgi.application"
CHANNEL_LAYERS = {"default": {"BACKEND": "channels_redis.core.RedisChannelLayer","CONFIG": {"hosts": [f"redis://:nineven@127.0.0.1:6379/0"],},},
}
二,vue及配置
yarn add xterm
yarn add xterm-addon-fit
创建 MessageTail.vue文件,内容如下:
查看日志
上一篇:总结791
下一篇:重载运算符操作说明(一)