changed to more object-y style

This commit is contained in:
Luke Ogburn
2022-04-05 21:02:35 -04:00
parent bf8d251c21
commit 64888cea1e
6 changed files with 340 additions and 266 deletions

5
.gitignore vendored
View File

@@ -1,4 +1,3 @@
save/
temp/
*/
config.json
savefile.json

View File

@@ -1,269 +1,45 @@
from mastodon import Mastodon
import praw
import requests
import os
import time
from bot import bot
from scraper import scraper
import json
import logging
import time
'''
TODO:
for deployment:
- [x] Keep track of what has been scraped and tooted to not duplicate posts
- [x] Download and posting of video files
- [x] Make sure text-only reddit posts work
- [x] Eternal looping, run script every 5 mins or something
- [x] Different masto post structures for different post types (videos need links)
done:
- [x] Download and posting of video files
- [x] Make sure text-only reddit posts work
- [x] Eternal looping, run script every 5 mins or something
- [x] Different masto post structures for different post types (videos need links)
- [x] Import bot/scraper settings from file for automation
- [x] Random post if low activity
likely:
- […] Keep track of what has been scraped and tooted to not duplicate posts
- […] Separate methdos methods to make code cleaner
- […] Debugging logging
- [ ] Move all vars into config
- [ ] Docker image
extras:
- [x] Import bot/scraper settings from file for automation
- [ ] Updating from @mention toot
- [ ] Improve debugging logging
- [ ] Info logging
- [ ] Error logging
- [ ] Add twitter bot
- [ ] Docker image?do
- [ ] Make this an installable (pip?) package
unlikely:
- [ ] Updating from @mention toot
- [ ] Make this an installable (pip?) package
- [ ] Add twitter bot
'''
#
def run(masto, service):
# post any new posts, up to limit
print("Scraping")
subs = service.scrape_all()
print("Tooting if necessary")
for sub in subs:
print(f" Tooting {sub}")
service.toot_posts(masto, subs[sub])
print("Remembering")
service.remember()
# Mastodon bot to post things
class bot():
def __init__(self, config, debug=False):
self.debug = debug
self.masto = Mastodon(access_token=config["mastodon"]["access_token"], api_base_url=config["mastodon"]["host"])
# uploads media to mastodon, returns the mastodon ID
# specify mimetype of video files as "video/mp4" to avoid error
def upload_media(self, filename, mimetype=None):
if self.debug: logging.info(f"Uploading media {filename}")
return self.masto.media_post(filename, mime_type=mimetype)
# uploads all given media
def upload_all_media(self, filenames):
ids = []
for fn in filenames:
ids.append(self.upload_media(fn))
return ids
def toot(self, text, media=None):
if self.debug: logging.info(f"Posting:\n Text: {text}\n Media: {', '.join(media) if media != None else 'None'}")
self.masto.status_post(text, media_ids=media)
# Reddit (maybe more in future) scaper to get postsn future) scaper to get posts
# parameters:
# service: one of ["reddit"]
# config: dict of config variables
class scraper():
def __init__(self, service, config, debug=False):
# dev
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)
self.current_services = ["reddit"]
# error checkitootng
if service.lower() not in self.current_services:
logging.error("Service invalid")
return None
# login to service
if service == "reddit":
self.login = praw.Reddit(
client_id=config["reddit"]["client_id"],
client_secret=config["reddit"]["client_secret"],
password=config["reddit"]["password"],
user_agent=config["reddit"]["user_agent"],
username=config["reddit"]["username"])
# make sure necessary filestructure is in place
needed_directories = ["temp", "save", f"save/{service}"]
for d in needed_directories:
if not os.path.isdir(d): os.mkdir(d)
if not os.path.exists(f"save/{service}"):
open(f"save/{service}", "w+")
f.close()
# set object variables
self.service = service
self.debug = debug
self.places = config[service]["places"]
# seent it list is a little more complicated
self.seent = {}
for f in os.listdir(f"save/{service}"):
savefile = open(f"save/{service}/{f}", "r").read().split("\n")
self.seent[f.split("/")[-1]] = [item for item in savefile] # dict faster
### HELPER METHODS
# helper method to clean out folder (delete all contents)
# expected structure: [["temp/a/1", "temp/a/2"], [], [], ["temp/e/1"]]
def remove_folders(self, folders_list):
for folder in folders_list:
if self.debug: logging.info(f"Clearing folder {folder}")
for file in folder:
os.remove(file)
if len(folder) > 0:
subfolder = "/".join(folder[0].split("/")[:-1])
os.rmdir(subfolder)
# helper method to download media
def download_media(self, url, filename):
# get file first
if self.debug: logging.info(f"Downloading {url} info {filename}")
resp = requests.get(url)
if resp.ok:
# make sure directory structure exists
structure = filename.split("/")
for i in range(len(structure[:-1])):
d = "/".join(structure[0:i+1])
if not os.path.isdir(d): os.mkdir(d)
# write the downloaded content to file
with open(filename, "wb+") as f:
f.write(resp.content)
# reddit helper method to return the post type
def get_post_type(self, post):
if post.url[8] == 'i': return "image"
if post.url[8] == 'v': return "video"
if post.url[23:30] == "gallery": return "gallery"
return "unknown"
# helper to save a list with a limit to a savefile
def create_savefile(self, places, limit):
# write to seent list memory and return posts
for place in places:
if self.debug: logging.info(f"Creating savefile save/{self.service}/{place}")
new_seent = [k for k in self.seent[place] if k != ""]
if len(new_seent) > limit: new_seent = new_seent[:limit]
open(f"save/{self.service}/{place}", "w").write("\n".join(new_seent))
### REDDIT METHODS
# gets posts from a given subreddit
def reddit_scrape(self, sub_name, limit):
# make sure seent list can store files for this sub
if sub_name not in self.seent:
self.seent[sub_name] = []
if not os.path.exists(f"save/{self.service}/{sub_name}"):
f = open(f"save/{self.service}/{sub_name}", "w+")
f.close()
# get posts that aren't in seent list
post_list = []
for p in self.login.subreddit(sub_name).new(limit=limit):
if p.id not in self.seent[sub_name]:
if self.debug: logging.info(f"Scraping post {p.id}")
post_list.append(p)
self.seent[sub_name] = [p.id] + self.seent[sub_name]
return post_list
# gets posts form all subreddits
def reddit_scrape_all(self, sub_names, limit):
subposts = {}
for sub in sub_names:
subposts[sub] = self.reddit_scrape(sub, limit)
return subposts
# downloads a given post; media is stored in temp/post_id/n
# returns a list of the stored file locations for that post
def reddit_download(self, post):
def make_gallery_urls():
nonlocal post
urls = []
for m in post.media_metadata:
mimetype = post.media_metadata[m]["m"]
end = mimetype[mimetype.find("/")+1:]
urls.append(f"https://i.redd.it/{m}.{end}")
return urls
# get the media URLs in array
reddit_urls = []
post_type = self.get_post_type(post)
if post_type == "image":
reddit_urls = [post.url]
elif post_type == "video":
raw_url = post.media["reddit_video"]["fallback_url"]
reddit_urls = [raw_url[:raw_url.find("?")]]
elif post_type == "gallery":
reddit_urls = make_gallery_urls()
# download all media
local_urls = []
i = 0
for url in reddit_urls:
i += 1
name = f"temp/{post.id}/{i}"
if self.debug: logging.info(f"Downloading {url} ({i}/{len(reddit_urls)})")
self.download_media(url, name)
local_urls.append(name)
return local_urls
# uses reddit_download to get all posts' media in a list of posts
# takes a list of posts, not a list of subs
# returns a list of lists, one list per post containing the local download locations for that post
def reddit_download_all(self, posts):
image_locations = []
for post in posts:
image_locations.append(self.download(post))
return image_locations
### WRAPPER METHODS; these should be the ones called directly
# gets posts from a given service's place (ie, a subreddit or twitter feed)
def scrape(self, place, limit=10):
if self.debug: logging.info(f"Scraping {self.service}: {place}... ")
if self.service == "reddit":
result = self.reddit_scrape(place, limit)
if self.debug: logging.info(f"Done scraping {self.service}: {place}.")
return result
# gets posts from a gives service's places (ie, multiple subreddits or feeds)
def scrape_all(self, places=None, limit=10):
if places == None: places = self.places
if self.service == "reddit":
result = self.reddit_scrape_all(places, limit)
return result
# downloads a given post's media and return the locations
def download(self, post):
if self.service == "reddit":
if self.debug: logging.info(f"Downloading {post.id}... ")
result = self.reddit_download(post)
if self.debug: logging.info(f"Done downloading {post.id}.")
return result
# downloads a list of post's media and returns a list of the locations
def download_all(self, posts):
if self.service == "reddit":
post_ids = [p.id for p in posts]
result = self.reddit_download_all(posts)
return result
# creates the savefile for a list of posts.
def remember(self, places=None, limit=10):
if places == None: places = self.places
if self.debug: logging.info(f"Remembering {', '.join(places)}...")
self.create_savefile(places, limit)
if self.debug: logging.info(f"Remembered {', '.join(places)}.")
### TOOTER METHODS (reddit only for now)
# builds a toot for convenience
def build_toot(self, masto, post):
toot = {}
toot["text"] = post.title
if self.get_post_type(post) == "video": toot["text"] += f"\n\n{post.url}"
local_media = self.download(post)
toot["media"] = masto.upload_all_media(local_media)
return toot
# toots all posts in list
def toot_posts(self, masto, posts):
for post in posts:
to_toot = self.build_toot(masto, post)
masto.toot(to_toot["text"], to_toot["media"])
return True
### RUNNING METHODS
def run(self, masto, places=None, limit=10):
if self.debug: logging.info(f"Running {self.service}.")
if places == None: places = self.places
subs = self.scrape_all(places=places, limit=limit)
for sub in subs:
self.toot_posts(masto, subs[sub])
self.remember()
# post random if it has been a while
print("Keeping lively")
service.keep_lively()
def main():
while True:
@@ -271,12 +47,11 @@ def main():
config = json.load(open('config.json', 'r'))
# make bots
masto = bot(config)
reddit = scraper("reddit", config, debug=True)
reddit = scraper("reddit", config, low_activity_random=True)
# run bots
reddit.run(masto)
# buffer time bc posts only happen so often
time.sleep(60)
run(masto, reddit)
# buffer time bc posts only happen so often so why check
time.sleep(5)
if __name__ == "__main__":
main()
scrape_all

25
bot.py Normal file
View File

@@ -0,0 +1,25 @@
from mastodon import Mastodon
import logging
# Mastodon bot to post things
class bot():
def __init__(self, config, debug=False):
self.debug = debug
self.masto = Mastodon(access_token=config["mastodon"]["access_token"], api_base_url=config["mastodon"]["host"])
# uploads media to mastodon, returns the mastodon ID
# specify mimetype of video files as "video/mp4" to avoid error
def upload_media(self, filename, mimetype=None):
logging.info(f"Uploading media {filename}")
return self.masto.media_post(filename, mime_type=mimetype)
# uploads all given media
def upload_all_media(self, filenames):
ids = []
for fn in filenames:
ids.append(self.upload_media(fn))
return ids
def toot(self, text, media=None):
logging.info(f"Posting:\n Text: {text}")
print("self.masto.status_post(text, media_ids=media)")

71
helper.py Normal file
View File

@@ -0,0 +1,71 @@
import requests
import os
import logging
from datetime import datetime
### HELPER METHODS
# helper method to clean out folder (delete all contents)
# expected structure: [["temp/a/1", "temp/a/2"], [], [], ["temp/e/1"]]
class helper():
def __init__(service):
# copy the service's variables to make them local
# because it's easier to access and doesn't requre the
# service to pass itself in every time
service = service.service
low_activity_random = service.low_activity_random
debug = service.debug
places = service.places
seent = service.seent
def remove_folders(folders_list):
for folder in folders_list:
logging.info(f"Clearing folder {folder}")
for file in folder:
os.remove(file)
if len(folder) > 0:
subfolder = "/".join(folder[0].split("/")[:-1])
os.rmdir(subfolder)
# helper method to download media
def download_media(url, filename):
# get file first
logging.info(f"Downloading {url} info {filename}")
resp = requests.get(url)
if resp.ok:
# make sure directory structure exists
structure = filename.split("/")
for i in range(len(structure[:-1])):
d = "/".join(structure[0:i+1])
if not os.path.isdir(d): os.mkdir(d)
# write the downloaded content to file
with open(filename, "wb+") as f:
f.write(resp.content)
# reddit helper method to return the post type
def get_post_type(post):
print(post.url)
if post.url[8] == 'i': return "image"
if post.url[8] == 'v': return "video"
if post.url[23:30] == "gallery": return "gallery"
return "unknown"
# returns True if the ts1 is older than ts2
# tsx should be a timestamp value
def ts_older(ts1, ts2):
# timedelta of `hours`
hours_delta = datetime.fromtimestamp(ts2) - datetime.fromtimestamp(0)
# timedelta of timestamp
stamp_delta = datetime.fromtimestamp(ts1)
stamp_delta = datetime.now() - stamp_delta
return stamp_delta > hours_delta
# returns True if place hasn't had a post in the past 12 hours according
# to the savefile
def been_awhile(seent_time, hours=12):
long_time = 60 * 60 * hours
return helper.ts_older(int(seent_time), long_time)
# takes in a ListingGenerator (list of reddit posts) and
# reverses it
def reddit_listify(LG):
return [p for p in LG]

129
reddit.py Normal file
View File

@@ -0,0 +1,129 @@
from helper import helper
import praw
import json
import time
import logging
class reddit_scraper:
def __init__(self, config):
self.login = praw.Reddit(
client_id=config["reddit"]["client_id"],
client_secret=config["reddit"]["client_secret"],
password=config["reddit"]["password"],
user_agent=config["reddit"]["user_agent"],
username=config["reddit"]["username"])
self.places = config["reddit"]["places"]
savefile = open("savefile.json", "r")
savefile = json.load(savefile)
try: self.seent = savefile["reddit"]
except: self.seent = {}
### REDDIT METHODS
# gets posts from a given subreddit
def scrape(self, sub, limit):
# make sure self.seent has the sub, add if not
if sub not in self.seent: self.seent[sub] = time.time()
# get posts that aren't in seent list
post_list = []
posts = self.login.subreddit(sub).new(limit=limit)
posts = helper.reddit_listify(posts)
for p in posts[::-1]:
if helper.ts_older(p.created, self.seent[sub]):
break
logging.info(f"Scraping post {p.id}")
post_list.append(p)
self.seent[sub] = p.created
return post_list
# scrapes all subreddits
def scrape_all(self, limit):
subposts = {}
for place in self.places:
subposts[place] = self.scrape(place, limit)
return subposts
# downloads a given post; media is stored in temp/post_id/n
# returns a list of the stored file locations for that post
def download(self, post):
def make_gallery_urls():
nonlocal post
urls = []
for m in post.media_metadata:
mimetype = post.media_metadata[m]["m"]
end = mimetype[mimetype.find("/")+1:]
urls.append(f"https://i.redd.it/{m}.{end}")
return urls
# video is sketchy, sorta WIP but maybe impossible
# to have consistently. this function does its best
def try_video_urls(post):
try:
raw_url = post.media["video"]["fallback_url"]
return [raw_url[:raw_url.find("?")]]
except:
try:
raw_url = post.media["reddit_video"]["fallback_url"]
return [raw_url[:raw_url.find("?")]]
except:
return []
return [] # should never be reached but just in case
# get the media URLs in array
urls = []
post_type = helper.get_post_type(post)
if post_type == "image":
urls = [post.url]
elif post_type == "video":
urls = try_video_urls(post)
elif post_type == "gallery":
urls = make_gallery_urls()
urls = [] # nueter download
# download all media
local_urls = []
i = 0
for url in urls:
i += 1
name = f"temp/{post.id}/{i}"
logging.info(f"Downloading {url} ({i}/{len(urls)})")
helper.download_media(url, name)
local_urls.append(name)
return local_urls
# posts if it's been a while. checks each sub and
def keep_lively(self):
for sub in self.places:
if helper.been_awhile(self.seent[sub]):
self.random_post(sub)
# gets a random post from reddit
def random_post(self, place):
return self.login.subreddit(place).random()
# creates the savefile for a list of posts.
def remember(self):
print(f"{self.seent}")
savefile = json.load(open("savefile.json", "r"))
savefile["reddit"] = self.seent
savefile = json.dumps(savefile)
with open("savefile.json", "w") as f:
f.write(savefile)
### TOOTER METHODS
# takes a toot and returns a dict of the text and media IDs
def build_toot(self, masto, post):
toot = {}
toot["text"] = post.title
if helper.get_post_type(post) == "video": toot["text"] += f"\n\n{post.url}"
local_media = self.download(post)
toot["media"] = masto.upload_all_media(local_media)
return toot
# toots all posts in list
def toot_posts(self, masto, posts):
for post in posts:
to_toot = self.build_toot(masto, post)
masto.toot(to_toot["text"], to_toot["media"])
return True

75
scraper.py Normal file
View File

@@ -0,0 +1,75 @@
import os
import logging
import json
from reddit import reddit_scraper as reddit
class scraper:
def __init__(self, service, config, low_activity_random=False):
# error checking
scrapers = ["reddit"]
if service.lower() not in scrapers:
logging.error(f"Scraper {service} invalid. Choose one of {', '.join(scrapers)}")
return None
# make sure necessary filestructure is in place
if not os.path.isdir("temp"): os.mkdir("temp")
if not os.path.exists("savefile.json"):
f = open("savefile.json", "w+")
f.write("{}")
# set object variables
self.low_activity_random = low_activity_random
self.service = service
# login to service
if service == "reddit": self.login = reddit(config)
### WRAPPER METHODS
def scrape(self, place, limit=10):
logging.info(f"Scraping {self.service}: {place}... ")
result = self.login.scrape(place, limit)
logging.info(f"Done scraping {self.service}: {place}.")
return result
# gets posts from a gives service's places (ie, multiple subreddits or feeds)
def scrape_all(self, limit=10):
return self.login.scrape_all(limit)
# downloads a given post's media and return the locations
def download(self, post):
logging.info(f"Downloading {post.id}... ")
result = self.login.download(post)
logging.info(f"Done downloading {post.id}.")
return result
# downloads a list of post's media and returns a list of the locations
def download_all(self, posts):
post_ids = [p.id for p in posts]
locations = []
for post in post_ids:
locations.append(self.login.download(post))
return locations
# creates the savefile for a list of posts.
def remember(self):
logging.info(f"Remembering {self.service}...")
self.login.remember()
logging.info(f"Remembered {self.service}.")
# posts for each place if it has been a while
def keep_lively(self):
self.login.keep_lively()
# posts a random post from the given place
def random_post(self, place):
logging.info(f"Getting random post for {place}")
return self.login.random_post(place)
### TOOTER METHODS
# takes a toot and returns a dict of the text and media IDs
def build_toot(self, masto, post):
return self.login.build_toot(masto, post)
# toots all posts in list
def toot_posts(self, masto, posts):
for post in posts:
to_toot = self.build_toot(masto, post)
masto.toot(to_toot["text"], to_toot["media"])
return True