diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..50f38bc --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +test.dat +socks/__pycache__/ diff --git a/rtbw.py b/rtbw.py index 9363b06..0f84864 100644 --- a/rtbw.py +++ b/rtbw.py @@ -9,22 +9,30 @@ import struct import pprint import time import datetime +import threading import json import re import time from threading import Thread, Lock import binascii import pylzma -from cffi import FFI import select from subprocess import Popen, PIPE import argparse + +from socks.transmission import SocketOverlay +from socks.transmission import Command + try: import ssl except ImportError: print ("error: no ssl support") import requests +DEBUG=False + +ShowLog=True + def __pmap(): return { "com": "c", @@ -44,7 +52,8 @@ def __pmap(): def log(stre): - print (stre) + if(ShowLog): + print (stre) def encode_post(post): mape = __pmap() @@ -90,36 +99,31 @@ class StatBuffer: return data else: return decode_post(data) + def _strip(self, post, string): + if string in post: + del post[string] + post["has_"+string] = True def _encode(self, post, striplv): if(striplv == StatBuffer.SLV_LOW): return encode_post(post) elif(striplv == StatBuffer.SLV_NOTEXT): - if "com" in post: - del post["com"] - if "sub" in post: - del post["sub"] + self._strip(post, "com") + self._strip(post, "sub") return encode_post(post) elif(striplv == StatBuffer.SLV_NOPI): - if "com" in post: - del post["com"] - if "sub" in post: - del post["sub"] - if "name" in post: - del post["name"] - if "trip" in post: - del post["trip"] + self._strip(post, "com") + self._strip(post, "sub") + self._strip(post, "name") + self._strip(post, "trip") return encode_post(post) elif(striplv == StatBuffer.SLV_NOUI): - if "com" in post: - del post["com"] - if "sub" in post: - del post["sub"] - if "name" in post: - del post["name"] - if "trip" in post: - del post["trip"] + self._strip(post, "com") + self._strip(post, "sub") + self._strip(post, "name") + self._strip(post, "trip") + return encode_post(post) if "filename" in post: - del post["filename"] + self._strip(post, "filename") del post["fileSize"] del post["realFilename"] del post["image"] @@ -180,7 +184,7 @@ class MemoryBuffer(StatBuffer): while(nl>=0): entry = super()._decode(self.store[nl]) if(entry["no"]<=floor): break - posts.append(ent) + posts.append(entry) nl-=1 super()._unlock() return posts @@ -336,27 +340,105 @@ def pnomax(last, posts): def buffer_write(buf, posts): for post in posts: buf.write(post) +def _fork(): + if DEBUG: + return 0 + else: + return os.fork() + +class Daemon(threading.Thread): + def __init__(self, socket, buf): + self.sock = socket + self.buf = buf + self.running=True + threading.Thread.__init__(self) + def _get(self,con, fr): + log("[daemon]: Recieved get from "+str(fr)) + data = self.buf.readno(fr) + js = json.dumps(data) + con.send(js.encode("utf-8")) + def run(self): + while self.running: + try: + con = SocketOverlay(self.sock.accept()[0]) + if not self.running: + con.close() + break + log("[daemon]: Connection accepted") + read = con.recv() + cmd = Command.unserialise(read) + + if cmd.uCommand == Command.CMD_SHUTDOWN: #shut down daemon + log("[daemon]: Recieved shutdown") + self.running=False + elif cmd.uCommand == Command.CMD_GET: #receive entries from + self._get(con, struct.unpack("L", cmd.uData)[0]) + elif cmd.uCommand == Command.CMD_CLEAR: #clear buffer + log("[daemon]: Recieved clear") + self.buf.clear() + else: #unknwon command + log("[daemon]: Recieved unknown command") + pass + con.close() + except socket.timeout: + pass + except: + self.running=False + raise + log("[daemon]: Exiting") + self.sock.close() + self.sock=None + def close(self): + log("[daemon-ctl]: Shutting down") + self.running=False + #TODO: When we request buffer data from daemon, send a min post number to send back (last) parser = argparse.ArgumentParser(description="Real-time 4chan board watcher.") parser.add_argument("board", help="Board to spider") parser.add_argument("timeout", help="Time between cycles") parser.add_argument("--buffer", help="Save buffer filename (default: use in memory buffer)", default=None) -parser.add_argument("--daemon", action="store_true", help="Run as daemon") +parser.add_argument("--daemon", metavar="Socket", help="Run as daemon", default=None) parser.add_argument("--api", help="Base URL of 4chan JSON API", default="http://api.4chan.org/%s/") +parser.add_argument("--debug", default=False, action="store_true") args = parser.parse_args() + +DEBUG = args.debug + +StripLevel = StatBuffer.SLV_NOTEXT + last=0 buf = None if args.buffer !=None: - buf = FileBuffer(args.buffer, StatBuffer.SLV_NOTEXT) + buf = FileBuffer(args.buffer, StripLevel) else: - buf = MemoryBuffer(StatBuffer.SLV_NOTEXT) + buf = MemoryBuffer(StripLevel) last = buf.findMax() +runForever=True +daemon = None + +if args.daemon!=None: + pid = _fork() + if not pid == 0: + log("Process forked to background: PID %d" % pid) + sys.exit(0) + else: + runForever=False + if not DEBUG: + ShowLog=False + daemon_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + daemon_sock.bind(args.daemon) + daemon_sock.settimeout(5) + daemon_sock.listen() + + daemon = Daemon(daemon_sock, buf) + daemon.start() + try: - while True: + while runForever or daemon.running: log("Reading threads for %s from %d" % (args.board, last)) posts = parse_page(args.api, args.board, "1", last) last = pnomax(last, posts) @@ -371,5 +453,7 @@ try: time.sleep(int(args.timeout)) except(KeyboardInterrupt): log("Interrupt detected") + if daemon!=None: + daemon.close() buf.close() log("Closing") diff --git a/rtbwctl.py b/rtbwctl.py new file mode 100644 index 0000000..398cd87 --- /dev/null +++ b/rtbwctl.py @@ -0,0 +1,66 @@ +import sys +import os +import os.path +import traceback +import json +import socket +import struct +import pprint +import time +import datetime +import threading +import json +import re +import time +from threading import Thread, Lock +import binascii +import pylzma +import select +from subprocess import Popen, PIPE +import argparse + +from socks.transmission import SocketOverlay +from socks.transmission import Command + +def parsecmd(s): + if s=="get": return Command.CMD_GET + elif s=="stop": return Command.CMD_SHUTDOWN + elif s=="clear": return Command.CMD_CLEAR + else: return None + +parser = argparse.ArgumentParser(description="rtbwpy daemon control.") +parser.add_argument("sock", help="AF_UNIX socket") +parser.add_argument("command", help="Command") +parser.add_argument("--data", help="Additional data to send", default=None) + +args = parser.parse_args() + +cmd = parsecmd(args.command) + +if cmd==None: + print("Invalid command.") + os._exit(1) + +data = args.data + +if cmd == Command.CMD_GET: + if data== None: + data = struct.pack("L", 0) + else: + data = struct.pack("L", int(data)) + +sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) +sock.connect(args.sock) + +con = SocketOverlay(sock) + +cmd = Command.build(cmd, data) + +con.send(cmd.serialise()) + +if cmd.uCommand == Command.CMD_GET: + json = con.recv().decode("utf-8") + print(json) + +sock.close() + diff --git a/socks/__init__.py b/socks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/socks/transmission.py b/socks/transmission.py new file mode 100644 index 0000000..f4426fa --- /dev/null +++ b/socks/transmission.py @@ -0,0 +1,56 @@ +import os +#import json +import socket +import sys +import time +import binascii +import struct + +class SocketOverlay(object): + def __init__(self, socket): + self.socket = socket + + def recv(self): + li = self.socket.recv(4) + l = struct.unpack("I", li)[0] + return self.socket.recv(l) + + def send(self, data): + l = len(data) + self.socket.send(struct.pack("I", l)) + self.socket.send(data) + + def close(self): + self.socket.close() + +class Command(object): + CMD_GET = 1 + CMD_CLEAR = 2 + CMD_SHUTDOWN = 3 + + def __init__(self): + self.uCommand = 0 + self.uData = None + + def serialise(self): + if self.uData==None: + return struct.pack("I", self.uCommand)+struct.pack("I", 0) + else: + return struct.pack("I", self.uCommand)+struct.pack("I", len(self.uData))+self.uData + + @classmethod + def build(cls, command, data=None): + c = Command() + c.uCommand = command + c.uData = data + return c + + @classmethod + def unserialise(cls, data): + cmd = struct.unpack("I", data[:4])[0] + ds = struct.unpack("I", data[4:8])[0] + if ds<1: + return Command.build(cmd) + else: + return Command.build(cmd, data[8:8+ds]) + diff --git a/test.dat b/test.dat deleted file mode 100644 index ce0cc37..0000000 Binary files a/test.dat and /dev/null differ