351 lines
13 KiB
Python
351 lines
13 KiB
Python
import sys
|
|
import socket
|
|
import threading
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from PySide6.QtWidgets import (
|
|
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QTextEdit,
|
|
QLineEdit, QPushButton, QLabel, QListWidget, QListWidgetItem,
|
|
QMessageBox, QDialog, QDialogButtonBox, QFormLayout, QLineEdit as QLE
|
|
)
|
|
from PySide6.QtCore import Qt, Signal, QObject
|
|
from PySide6.QtGui import QIcon
|
|
|
|
CONFIG_FILE = "pirc_config.json"
|
|
|
|
|
|
class IRCWorker(QObject):
|
|
message_received = Signal(str)
|
|
system_message = Signal(str)
|
|
user_list_update = Signal(list)
|
|
disconnected = Signal()
|
|
|
|
def __init__(self, server, port, nick, channels):
|
|
super().__init__()
|
|
self.server = server
|
|
self.port = port
|
|
self.nick = nick
|
|
self.channels = channels
|
|
self.running = True
|
|
self.sock = None
|
|
self.current_users = []
|
|
|
|
def start(self):
|
|
try:
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.connect((self.server, self.port))
|
|
self.sock.send(f"NICK {self.nick}\r\n".encode("utf-8"))
|
|
self.sock.send(f"USER {self.nick} 0 * :PIRC User\r\n".encode("utf-8"))
|
|
|
|
for channel in self.channels:
|
|
self.sock.send(f"JOIN {channel}\r\n".encode("utf-8"))
|
|
self.system_message.emit(f"Joined {channel}")
|
|
|
|
threading.Thread(target=self.listen, daemon=True).start()
|
|
except Exception as e:
|
|
self.system_message.emit(f"Connection failed: {e}")
|
|
self.disconnected.emit()
|
|
|
|
def listen(self):
|
|
try:
|
|
while self.running:
|
|
data = self.sock.recv(4096).decode("utf-8", errors="ignore")
|
|
if not data:
|
|
break
|
|
|
|
for line in data.split("\r\n"):
|
|
if not line:
|
|
continue
|
|
|
|
# PING/PONG
|
|
if line.startswith("PING"):
|
|
self.sock.send(f"PONG {line.split()[1]}\r\n".encode("utf-8"))
|
|
continue
|
|
|
|
# Ignore raw handshake notices
|
|
if line.startswith(":") and "NOTICE * :***" in line:
|
|
continue
|
|
|
|
parts = line.split()
|
|
if len(parts) > 1 and parts[1].isdigit():
|
|
code = int(parts[1])
|
|
if code == 353: # NAMES list
|
|
name_parts = line.split(":", 2)
|
|
if len(name_parts) < 3:
|
|
continue
|
|
users_chunk = name_parts[2].split()
|
|
self.current_users.extend(users_chunk)
|
|
elif code == 366: # End of NAMES list
|
|
self.user_list_update.emit(self.current_users)
|
|
self.current_users = []
|
|
elif code in [0o01, 332, 333]:
|
|
self.system_message.emit(line)
|
|
continue
|
|
# PRIVMSG
|
|
if "PRIVMSG" in line:
|
|
msg_parts = line.split(":", 2)
|
|
if len(msg_parts) >= 3:
|
|
prefix = msg_parts[1].split("!")[0]
|
|
message = msg_parts[2]
|
|
self.message_received.emit(f"<{prefix}> {message}")
|
|
continue
|
|
|
|
# JOIN
|
|
if "JOIN" in line:
|
|
user = line.split("!")[0].replace(":", "")
|
|
chan = line.split("JOIN")[1].strip()
|
|
if user not in self.current_users:
|
|
self.current_users.append(user)
|
|
self.user_list_update.emit(self.current_users)
|
|
self.system_message.emit(f"{user} joined {chan}")
|
|
continue
|
|
|
|
# PART/QUIT
|
|
if "PART" in line or "QUIT" in line:
|
|
user = line.split("!")[0].replace(":", "")
|
|
if user in self.current_users:
|
|
self.current_users.remove(user)
|
|
self.user_list_update.emit(self.current_users)
|
|
self.system_message.emit(f"{user} left")
|
|
continue
|
|
|
|
# Other system messages
|
|
self.system_message.emit(line)
|
|
except Exception as e:
|
|
self.system_message.emit(f"Error: {e}")
|
|
finally:
|
|
self.disconnected.emit()
|
|
|
|
def send_message(self, channel, message):
|
|
try:
|
|
self.sock.send(f"PRIVMSG {channel} :{message}\r\n".encode("utf-8"))
|
|
except Exception as e:
|
|
self.system_message.emit(f"Failed to send message: {e}")
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
try:
|
|
self.sock.send(b"QUIT :Goodbye!\r\n")
|
|
self.sock.close()
|
|
except:
|
|
pass
|
|
|
|
|
|
class AddServerDialog(QDialog):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setWindowTitle("Add Server")
|
|
self.setFixedSize(300, 200)
|
|
layout = QFormLayout()
|
|
self.server_input = QLE()
|
|
self.port_input = QLE()
|
|
self.port_input.setText("6667")
|
|
self.channels_input = QLE()
|
|
layout.addRow("Server:", self.server_input)
|
|
layout.addRow("Port:", self.port_input)
|
|
layout.addRow("Channels (comma-separated):", self.channels_input)
|
|
buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
|
|
buttons.accepted.connect(self.accept)
|
|
buttons.rejected.connect(self.reject)
|
|
layout.addWidget(buttons)
|
|
self.setLayout(layout)
|
|
|
|
def get_data(self):
|
|
return (
|
|
self.server_input.text().strip(),
|
|
int(self.port_input.text().strip()),
|
|
[c.strip() for c in self.channels_input.text().split(",") if c.strip()]
|
|
)
|
|
|
|
|
|
class PIRCClient(QWidget):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setWindowTitle("PIRC — Python IRC Client")
|
|
self.setGeometry(100, 100, 1000, 600)
|
|
self.worker = None
|
|
|
|
self.load_config()
|
|
self.init_ui()
|
|
|
|
if self.favorites:
|
|
first = self.favorites[0]
|
|
self.connect_to_server(first["server"], first.get("port", 6667), first["channels"])
|
|
else:
|
|
self.system_message("No favorite servers found — click the + button to connect.")
|
|
|
|
def load_config(self):
|
|
if os.path.exists(CONFIG_FILE):
|
|
with open(CONFIG_FILE, "r") as f:
|
|
cfg = json.load(f)
|
|
self.nick = cfg.get("username", "PIRCUser")
|
|
self.favorites = cfg.get("favorites", [])
|
|
else:
|
|
self.nick = "PIRCUser"
|
|
self.favorites = []
|
|
|
|
def save_config(self):
|
|
cfg = {
|
|
"username": self.nick,
|
|
"favorites": self.favorites
|
|
}
|
|
with open(CONFIG_FILE, "w") as f:
|
|
json.dump(cfg, f, indent=4)
|
|
|
|
def init_ui(self):
|
|
layout = QHBoxLayout()
|
|
|
|
# Server sidebar
|
|
sidebar_layout = QVBoxLayout()
|
|
self.server_list = QListWidget()
|
|
for fav in self.favorites:
|
|
item = QListWidgetItem(fav["server"])
|
|
self.server_list.addItem(item)
|
|
self.server_list.itemClicked.connect(self.server_selected)
|
|
sidebar_layout.addWidget(self.server_list)
|
|
|
|
# Add server button with + icon
|
|
self.add_server_button = QPushButton()
|
|
self.add_server_button.setIcon(QIcon("add.png"))
|
|
self.add_server_button.setText("") # icon only
|
|
self.add_server_button.clicked.connect(self.add_server)
|
|
self.disconnect_button = QPushButton("Disconnect")
|
|
self.disconnect_button.clicked.connect(self.disconnect_server)
|
|
sidebar_layout.addWidget(self.add_server_button)
|
|
sidebar_layout.addWidget(self.disconnect_button)
|
|
layout.addLayout(sidebar_layout, 2)
|
|
|
|
# Chat + user list
|
|
chat_layout = QVBoxLayout()
|
|
self.info_label = QLabel("Disconnected")
|
|
self.info_label.setStyleSheet("font-weight: bold; padding: 4px;")
|
|
chat_layout.addWidget(self.info_label)
|
|
|
|
chat_area_layout = QHBoxLayout()
|
|
self.chat_display = QTextEdit()
|
|
self.chat_display.setReadOnly(True)
|
|
chat_area_layout.addWidget(self.chat_display, 7)
|
|
|
|
# User list sidebar
|
|
self.user_list = QListWidget()
|
|
chat_area_layout.addWidget(self.user_list, 3)
|
|
|
|
chat_layout.addLayout(chat_area_layout, 8)
|
|
|
|
# Message input
|
|
input_layout = QHBoxLayout()
|
|
self.message_input = QLineEdit()
|
|
self.message_input.returnPressed.connect(self.send_message)
|
|
self.send_button = QPushButton("Send")
|
|
self.send_button.clicked.connect(self.send_message)
|
|
input_layout.addWidget(self.message_input)
|
|
input_layout.addWidget(self.send_button)
|
|
chat_layout.addLayout(input_layout)
|
|
|
|
layout.addLayout(chat_layout, 8)
|
|
self.setLayout(layout)
|
|
|
|
def connect_to_server(self, server, port, channels):
|
|
if self.worker:
|
|
self.disconnect_server()
|
|
self.worker = IRCWorker(server, port, self.nick, channels)
|
|
self.worker.message_received.connect(self.display_message)
|
|
self.worker.system_message.connect(self.system_message)
|
|
self.worker.user_list_update.connect(self.update_user_list)
|
|
self.worker.disconnected.connect(self.handle_disconnect)
|
|
self.worker.start()
|
|
self.info_label.setText(f"Connected to {server} as {self.nick}")
|
|
self.system_message(f"Connected to {server}")
|
|
|
|
def disconnect_server(self):
|
|
if self.worker:
|
|
self.worker.stop()
|
|
self.worker = None
|
|
self.system_message("Disconnected from server.")
|
|
self.info_label.setText("Disconnected")
|
|
self.user_list.clear()
|
|
|
|
def handle_disconnect(self):
|
|
self.worker = None
|
|
self.system_message("Connection closed by server.")
|
|
self.info_label.setText("Disconnected")
|
|
self.user_list.clear()
|
|
|
|
def server_selected(self, item):
|
|
selected_server = item.text()
|
|
fav = next((f for f in self.favorites if f["server"] == selected_server), None)
|
|
if fav:
|
|
self.connect_to_server(fav["server"], fav.get("port", 6667), fav["channels"])
|
|
|
|
def add_server(self):
|
|
dialog = AddServerDialog()
|
|
if dialog.exec() == QDialog.Accepted:
|
|
server, port, channels = dialog.get_data()
|
|
if not server:
|
|
QMessageBox.warning(self, "Error", "Server cannot be empty.")
|
|
return
|
|
new_server = {"server": server, "port": port, "channels": channels}
|
|
self.favorites.append(new_server)
|
|
self.save_config()
|
|
self.server_list.addItem(server)
|
|
self.connect_to_server(server, port, channels)
|
|
|
|
def display_message(self, msg):
|
|
timestamp = datetime.now().strftime("[%H:%M:%S]")
|
|
self.chat_display.append(f"{timestamp} {msg}")
|
|
|
|
def system_message(self, msg):
|
|
timestamp = datetime.now().strftime("[%H:%M:%S]")
|
|
self.chat_display.append(
|
|
f'<div style="background-color:#cce4ff; padding:2px; border-radius:3px;">{timestamp} [System] {msg}</div>'
|
|
)
|
|
|
|
def update_user_list(self, users):
|
|
self.user_list.clear()
|
|
admins = []
|
|
normal_users = []
|
|
|
|
for user in users:
|
|
if user.startswith("@"):
|
|
admins.append(user[1:]) # remove @ for display
|
|
elif user.startswith("+"):
|
|
normal_users.append(user[1:]) # optional remove +
|
|
else:
|
|
normal_users.append(user)
|
|
|
|
admins.sort()
|
|
normal_users.sort()
|
|
ordered_users = admins + normal_users
|
|
|
|
for user in ordered_users:
|
|
item = QListWidgetItem(user)
|
|
if user in admins:
|
|
item.setIcon(QIcon("admin.png"))
|
|
else:
|
|
item.setIcon(QIcon("user.png"))
|
|
self.user_list.addItem(item)
|
|
|
|
def send_message(self):
|
|
text = self.message_input.text().strip()
|
|
if not text:
|
|
return
|
|
if self.worker and self.worker.channels:
|
|
self.worker.send_message(self.worker.channels[0], text)
|
|
self.display_message(f"<{self.nick}> {text}")
|
|
else:
|
|
self.system_message("Not connected to a server.")
|
|
self.message_input.clear()
|
|
|
|
def closeEvent(self, event):
|
|
if self.worker:
|
|
self.worker.stop()
|
|
super().closeEvent(event)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = QApplication(sys.argv)
|
|
client = PIRCClient()
|
|
client.show()
|
|
sys.exit(app.exec())
|