Added daemon, improved stripping

master
Ringo Wantanabe 6 years ago
parent 1a9c4a5698
commit 6f4a00b079
No known key found for this signature in database
GPG Key ID: C1C1CD34CF2907B2

2
.gitignore vendored

@ -0,0 +1,2 @@
test.dat
socks/__pycache__/

@ -9,22 +9,30 @@ import struct
import pprint import pprint
import time import time
import datetime import datetime
import threading
import json import json
import re import re
import time import time
from threading import Thread, Lock from threading import Thread, Lock
import binascii import binascii
import pylzma import pylzma
from cffi import FFI
import select import select
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
import argparse import argparse
from socks.transmission import SocketOverlay
from socks.transmission import Command
try: try:
import ssl import ssl
except ImportError: except ImportError:
print ("error: no ssl support") print ("error: no ssl support")
import requests import requests
DEBUG=False
ShowLog=True
def __pmap(): def __pmap():
return { return {
"com": "c", "com": "c",
@ -44,7 +52,8 @@ def __pmap():
def log(stre): def log(stre):
print (stre) if(ShowLog):
print (stre)
def encode_post(post): def encode_post(post):
mape = __pmap() mape = __pmap()
@ -90,36 +99,31 @@ class StatBuffer:
return data return data
else: else:
return decode_post(data) return decode_post(data)
def _strip(self, post, string):
if string in post:
del post[string]
post["has_"+string] = True
def _encode(self, post, striplv): def _encode(self, post, striplv):
if(striplv == StatBuffer.SLV_LOW): if(striplv == StatBuffer.SLV_LOW):
return encode_post(post) return encode_post(post)
elif(striplv == StatBuffer.SLV_NOTEXT): elif(striplv == StatBuffer.SLV_NOTEXT):
if "com" in post: self._strip(post, "com")
del post["com"] self._strip(post, "sub")
if "sub" in post:
del post["sub"]
return encode_post(post) return encode_post(post)
elif(striplv == StatBuffer.SLV_NOPI): elif(striplv == StatBuffer.SLV_NOPI):
if "com" in post: self._strip(post, "com")
del post["com"] self._strip(post, "sub")
if "sub" in post: self._strip(post, "name")
del post["sub"] self._strip(post, "trip")
if "name" in post:
del post["name"]
if "trip" in post:
del post["trip"]
return encode_post(post) return encode_post(post)
elif(striplv == StatBuffer.SLV_NOUI): elif(striplv == StatBuffer.SLV_NOUI):
if "com" in post: self._strip(post, "com")
del post["com"] self._strip(post, "sub")
if "sub" in post: self._strip(post, "name")
del post["sub"] self._strip(post, "trip")
if "name" in post: return encode_post(post)
del post["name"]
if "trip" in post:
del post["trip"]
if "filename" in post: if "filename" in post:
del post["filename"] self._strip(post, "filename")
del post["fileSize"] del post["fileSize"]
del post["realFilename"] del post["realFilename"]
del post["image"] del post["image"]
@ -180,7 +184,7 @@ class MemoryBuffer(StatBuffer):
while(nl>=0): while(nl>=0):
entry = super()._decode(self.store[nl]) entry = super()._decode(self.store[nl])
if(entry["no"]<=floor): break if(entry["no"]<=floor): break
posts.append(ent) posts.append(entry)
nl-=1 nl-=1
super()._unlock() super()._unlock()
return posts return posts
@ -336,27 +340,105 @@ def pnomax(last, posts):
def buffer_write(buf, posts): def buffer_write(buf, posts):
for post in posts: for post in posts:
buf.write(post) buf.write(post)
def _fork():
if DEBUG:
return 0
else:
return os.fork()
class Daemon(threading.Thread):
def __init__(self, socket, buf):
self.sock = socket
self.buf = buf
self.running=True
threading.Thread.__init__(self)
def _get(self,con, fr):
log("[daemon]: Recieved get from "+str(fr))
data = self.buf.readno(fr)
js = json.dumps(data)
con.send(js.encode("utf-8"))
def run(self):
while self.running:
try:
con = SocketOverlay(self.sock.accept()[0])
if not self.running:
con.close()
break
log("[daemon]: Connection accepted")
read = con.recv()
cmd = Command.unserialise(read)
if cmd.uCommand == Command.CMD_SHUTDOWN: #shut down daemon
log("[daemon]: Recieved shutdown")
self.running=False
elif cmd.uCommand == Command.CMD_GET: #receive entries from <data>
self._get(con, struct.unpack("L", cmd.uData)[0])
elif cmd.uCommand == Command.CMD_CLEAR: #clear buffer
log("[daemon]: Recieved clear")
self.buf.clear()
else: #unknwon command
log("[daemon]: Recieved unknown command")
pass
con.close()
except socket.timeout:
pass
except:
self.running=False
raise
log("[daemon]: Exiting")
self.sock.close()
self.sock=None
def close(self):
log("[daemon-ctl]: Shutting down")
self.running=False
#TODO: When we request buffer data from daemon, send a min post number to send back (last) #TODO: When we request buffer data from daemon, send a min post number to send back (last)
parser = argparse.ArgumentParser(description="Real-time 4chan board watcher.") parser = argparse.ArgumentParser(description="Real-time 4chan board watcher.")
parser.add_argument("board", help="Board to spider") parser.add_argument("board", help="Board to spider")
parser.add_argument("timeout", help="Time between cycles") parser.add_argument("timeout", help="Time between cycles")
parser.add_argument("--buffer", help="Save buffer filename (default: use in memory buffer)", default=None) parser.add_argument("--buffer", help="Save buffer filename (default: use in memory buffer)", default=None)
parser.add_argument("--daemon", action="store_true", help="Run as daemon") parser.add_argument("--daemon", metavar="Socket", help="Run as daemon", default=None)
parser.add_argument("--api", help="Base URL of 4chan JSON API", default="http://api.4chan.org/%s/") parser.add_argument("--api", help="Base URL of 4chan JSON API", default="http://api.4chan.org/%s/")
parser.add_argument("--debug", default=False, action="store_true")
args = parser.parse_args() args = parser.parse_args()
DEBUG = args.debug
StripLevel = StatBuffer.SLV_NOTEXT
last=0 last=0
buf = None buf = None
if args.buffer !=None: if args.buffer !=None:
buf = FileBuffer(args.buffer, StatBuffer.SLV_NOTEXT) buf = FileBuffer(args.buffer, StripLevel)
else: else:
buf = MemoryBuffer(StatBuffer.SLV_NOTEXT) buf = MemoryBuffer(StripLevel)
last = buf.findMax() last = buf.findMax()
runForever=True
daemon = None
if args.daemon!=None:
pid = _fork()
if not pid == 0:
log("Process forked to background: PID %d" % pid)
sys.exit(0)
else:
runForever=False
if not DEBUG:
ShowLog=False
daemon_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
daemon_sock.bind(args.daemon)
daemon_sock.settimeout(5)
daemon_sock.listen()
daemon = Daemon(daemon_sock, buf)
daemon.start()
try: try:
while True: while runForever or daemon.running:
log("Reading threads for %s from %d" % (args.board, last)) log("Reading threads for %s from %d" % (args.board, last))
posts = parse_page(args.api, args.board, "1", last) posts = parse_page(args.api, args.board, "1", last)
last = pnomax(last, posts) last = pnomax(last, posts)
@ -371,5 +453,7 @@ try:
time.sleep(int(args.timeout)) time.sleep(int(args.timeout))
except(KeyboardInterrupt): except(KeyboardInterrupt):
log("Interrupt detected") log("Interrupt detected")
if daemon!=None:
daemon.close()
buf.close() buf.close()
log("Closing") log("Closing")

@ -0,0 +1,66 @@
import sys
import os
import os.path
import traceback
import json
import socket
import struct
import pprint
import time
import datetime
import threading
import json
import re
import time
from threading import Thread, Lock
import binascii
import pylzma
import select
from subprocess import Popen, PIPE
import argparse
from socks.transmission import SocketOverlay
from socks.transmission import Command
def parsecmd(s):
if s=="get": return Command.CMD_GET
elif s=="stop": return Command.CMD_SHUTDOWN
elif s=="clear": return Command.CMD_CLEAR
else: return None
parser = argparse.ArgumentParser(description="rtbwpy daemon control.")
parser.add_argument("sock", help="AF_UNIX socket")
parser.add_argument("command", help="Command")
parser.add_argument("--data", help="Additional data to send", default=None)
args = parser.parse_args()
cmd = parsecmd(args.command)
if cmd==None:
print("Invalid command.")
os._exit(1)
data = args.data
if cmd == Command.CMD_GET:
if data== None:
data = struct.pack("L", 0)
else:
data = struct.pack("L", int(data))
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(args.sock)
con = SocketOverlay(sock)
cmd = Command.build(cmd, data)
con.send(cmd.serialise())
if cmd.uCommand == Command.CMD_GET:
json = con.recv().decode("utf-8")
print(json)
sock.close()

@ -0,0 +1,56 @@
import os
#import json
import socket
import sys
import time
import binascii
import struct
class SocketOverlay(object):
def __init__(self, socket):
self.socket = socket
def recv(self):
li = self.socket.recv(4)
l = struct.unpack("I", li)[0]
return self.socket.recv(l)
def send(self, data):
l = len(data)
self.socket.send(struct.pack("I", l))
self.socket.send(data)
def close(self):
self.socket.close()
class Command(object):
CMD_GET = 1
CMD_CLEAR = 2
CMD_SHUTDOWN = 3
def __init__(self):
self.uCommand = 0
self.uData = None
def serialise(self):
if self.uData==None:
return struct.pack("I", self.uCommand)+struct.pack("I", 0)
else:
return struct.pack("I", self.uCommand)+struct.pack("I", len(self.uData))+self.uData
@classmethod
def build(cls, command, data=None):
c = Command()
c.uCommand = command
c.uData = data
return c
@classmethod
def unserialise(cls, data):
cmd = struct.unpack("I", data[:4])[0]
ds = struct.unpack("I", data[4:8])[0]
if ds<1:
return Command.build(cmd)
else:
return Command.build(cmd, data[8:8+ds])

Binary file not shown.
Loading…
Cancel
Save