diff --git a/.gitignore b/.gitignore index d32d6fa..72b414a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ -save/ -temp/ +*/ config.json - +savefile.json diff --git a/autotoot.py b/autotoot.py index 6676338..9fa2aba 100644 --- a/autotoot.py +++ b/autotoot.py @@ -1,269 +1,45 @@ -from mastodon import Mastodon -import praw -import requests -import os -import time +from bot import bot +from scraper import scraper import json -import logging +import time ''' TODO: - for deployment: - - [x] Keep track of what has been scraped and tooted to not duplicate posts - - [x] Download and posting of video files - - [x] Make sure text-only reddit posts work - - [x] Eternal looping, run script every 5 mins or something - - [x] Different masto post structures for different post types (videos need links) + done: + - [x] Download and posting of video files + - [x] Make sure text-only reddit posts work + - [x] Eternal looping, run script every 5 mins or something + - [x] Different masto post structures for different post types (videos need links) + - [x] Import bot/scraper settings from file for automation + - [x] Random post if low activity + + likely: + - […] Keep track of what has been scraped and tooted to not duplicate posts + - […] Separate methdos methods to make code cleaner + - […] Debugging logging + - [ ] Move all vars into config + - [ ] Docker image - extras: - - [x] Import bot/scraper settings from file for automation - - [ ] Updating from @mention toot - - [ ] Improve debugging logging - - [ ] Info logging - - [ ] Error logging - - [ ] Add twitter bot - - [ ] Docker image?do - - [ ] Make this an installable (pip?) package + unlikely: + - [ ] Updating from @mention toot + - [ ] Make this an installable (pip?) package + - [ ] Add twitter bot ''' -# +def run(masto, service): + # post any new posts, up to limit + print("Scraping") + subs = service.scrape_all() + print("Tooting if necessary") + for sub in subs: + print(f" Tooting {sub}") + service.toot_posts(masto, subs[sub]) + print("Remembering") + service.remember() -# Mastodon bot to post things -class bot(): - def __init__(self, config, debug=False): - self.debug = debug - self.masto = Mastodon(access_token=config["mastodon"]["access_token"], api_base_url=config["mastodon"]["host"]) - - # uploads media to mastodon, returns the mastodon ID - # specify mimetype of video files as "video/mp4" to avoid error - def upload_media(self, filename, mimetype=None): - if self.debug: logging.info(f"Uploading media {filename}") - return self.masto.media_post(filename, mime_type=mimetype) - - # uploads all given media - def upload_all_media(self, filenames): - ids = [] - for fn in filenames: - ids.append(self.upload_media(fn)) - return ids - - def toot(self, text, media=None): - if self.debug: logging.info(f"Posting:\n Text: {text}\n Media: {', '.join(media) if media != None else 'None'}") - self.masto.status_post(text, media_ids=media) - -# Reddit (maybe more in future) scaper to get postsn future) scaper to get posts -# parameters: -# service: one of ["reddit"] -# config: dict of config variables -class scraper(): - def __init__(self, service, config, debug=False): - # dev - console = logging.StreamHandler() - console.setLevel(logging.INFO) - logging.getLogger().addHandler(console) - self.current_services = ["reddit"] - # error checkitootng - if service.lower() not in self.current_services: - logging.error("Service invalid") - return None - # login to service - if service == "reddit": - self.login = praw.Reddit( - client_id=config["reddit"]["client_id"], - client_secret=config["reddit"]["client_secret"], - password=config["reddit"]["password"], - user_agent=config["reddit"]["user_agent"], - username=config["reddit"]["username"]) - # make sure necessary filestructure is in place - needed_directories = ["temp", "save", f"save/{service}"] - for d in needed_directories: - if not os.path.isdir(d): os.mkdir(d) - if not os.path.exists(f"save/{service}"): - open(f"save/{service}", "w+") - f.close() - # set object variables - self.service = service - self.debug = debug - self.places = config[service]["places"] - # seent it list is a little more complicated - self.seent = {} - for f in os.listdir(f"save/{service}"): - savefile = open(f"save/{service}/{f}", "r").read().split("\n") - self.seent[f.split("/")[-1]] = [item for item in savefile] # dict faster - - - ### HELPER METHODS - # helper method to clean out folder (delete all contents) - # expected structure: [["temp/a/1", "temp/a/2"], [], [], ["temp/e/1"]] - def remove_folders(self, folders_list): - for folder in folders_list: - if self.debug: logging.info(f"Clearing folder {folder}") - for file in folder: - os.remove(file) - if len(folder) > 0: - subfolder = "/".join(folder[0].split("/")[:-1]) - os.rmdir(subfolder) - - # helper method to download media - def download_media(self, url, filename): - # get file first - if self.debug: logging.info(f"Downloading {url} info {filename}") - resp = requests.get(url) - if resp.ok: - # make sure directory structure exists - structure = filename.split("/") - for i in range(len(structure[:-1])): - d = "/".join(structure[0:i+1]) - if not os.path.isdir(d): os.mkdir(d) - # write the downloaded content to file - with open(filename, "wb+") as f: - f.write(resp.content) - - # reddit helper method to return the post type - def get_post_type(self, post): - if post.url[8] == 'i': return "image" - if post.url[8] == 'v': return "video" - if post.url[23:30] == "gallery": return "gallery" - return "unknown" - - # helper to save a list with a limit to a savefile - def create_savefile(self, places, limit): - # write to seent list memory and return posts - for place in places: - if self.debug: logging.info(f"Creating savefile save/{self.service}/{place}") - new_seent = [k for k in self.seent[place] if k != ""] - if len(new_seent) > limit: new_seent = new_seent[:limit] - open(f"save/{self.service}/{place}", "w").write("\n".join(new_seent)) - - - ### REDDIT METHODS - # gets posts from a given subreddit - def reddit_scrape(self, sub_name, limit): - # make sure seent list can store files for this sub - if sub_name not in self.seent: - self.seent[sub_name] = [] - if not os.path.exists(f"save/{self.service}/{sub_name}"): - f = open(f"save/{self.service}/{sub_name}", "w+") - f.close() - # get posts that aren't in seent list - post_list = [] - for p in self.login.subreddit(sub_name).new(limit=limit): - if p.id not in self.seent[sub_name]: - if self.debug: logging.info(f"Scraping post {p.id}") - post_list.append(p) - self.seent[sub_name] = [p.id] + self.seent[sub_name] - return post_list - - # gets posts form all subreddits - def reddit_scrape_all(self, sub_names, limit): - subposts = {} - for sub in sub_names: - subposts[sub] = self.reddit_scrape(sub, limit) - return subposts - - # downloads a given post; media is stored in temp/post_id/n - # returns a list of the stored file locations for that post - def reddit_download(self, post): - def make_gallery_urls(): - nonlocal post - urls = [] - for m in post.media_metadata: - mimetype = post.media_metadata[m]["m"] - end = mimetype[mimetype.find("/")+1:] - urls.append(f"https://i.redd.it/{m}.{end}") - return urls - - # get the media URLs in array - reddit_urls = [] - post_type = self.get_post_type(post) - if post_type == "image": - reddit_urls = [post.url] - elif post_type == "video": - raw_url = post.media["reddit_video"]["fallback_url"] - reddit_urls = [raw_url[:raw_url.find("?")]] - elif post_type == "gallery": - reddit_urls = make_gallery_urls() - - # download all media - local_urls = [] - i = 0 - for url in reddit_urls: - i += 1 - name = f"temp/{post.id}/{i}" - if self.debug: logging.info(f"Downloading {url} ({i}/{len(reddit_urls)})") - self.download_media(url, name) - local_urls.append(name) - - return local_urls - - # uses reddit_download to get all posts' media in a list of posts - # takes a list of posts, not a list of subs - # returns a list of lists, one list per post containing the local download locations for that post - def reddit_download_all(self, posts): - image_locations = [] - for post in posts: - image_locations.append(self.download(post)) - return image_locations - - - ### WRAPPER METHODS; these should be the ones called directly - # gets posts from a given service's place (ie, a subreddit or twitter feed) - def scrape(self, place, limit=10): - if self.debug: logging.info(f"Scraping {self.service}: {place}... ") - if self.service == "reddit": - result = self.reddit_scrape(place, limit) - if self.debug: logging.info(f"Done scraping {self.service}: {place}.") - return result - # gets posts from a gives service's places (ie, multiple subreddits or feeds) - def scrape_all(self, places=None, limit=10): - if places == None: places = self.places - if self.service == "reddit": - result = self.reddit_scrape_all(places, limit) - return result - # downloads a given post's media and return the locations - def download(self, post): - if self.service == "reddit": - if self.debug: logging.info(f"Downloading {post.id}... ") - result = self.reddit_download(post) - if self.debug: logging.info(f"Done downloading {post.id}.") - return result - # downloads a list of post's media and returns a list of the locations - def download_all(self, posts): - if self.service == "reddit": - post_ids = [p.id for p in posts] - result = self.reddit_download_all(posts) - return result - # creates the savefile for a list of posts. - def remember(self, places=None, limit=10): - if places == None: places = self.places - if self.debug: logging.info(f"Remembering {', '.join(places)}...") - self.create_savefile(places, limit) - if self.debug: logging.info(f"Remembered {', '.join(places)}.") - - ### TOOTER METHODS (reddit only for now) - # builds a toot for convenience - def build_toot(self, masto, post): - toot = {} - toot["text"] = post.title - if self.get_post_type(post) == "video": toot["text"] += f"\n\n{post.url}" - local_media = self.download(post) - toot["media"] = masto.upload_all_media(local_media) - return toot - # toots all posts in list - def toot_posts(self, masto, posts): - for post in posts: - to_toot = self.build_toot(masto, post) - masto.toot(to_toot["text"], to_toot["media"]) - return True - - ### RUNNING METHODS - def run(self, masto, places=None, limit=10): - if self.debug: logging.info(f"Running {self.service}.") - if places == None: places = self.places - subs = self.scrape_all(places=places, limit=limit) - for sub in subs: - self.toot_posts(masto, subs[sub]) - self.remember() + # post random if it has been a while + print("Keeping lively") + service.keep_lively() def main(): while True: @@ -271,12 +47,11 @@ def main(): config = json.load(open('config.json', 'r')) # make bots masto = bot(config) - reddit = scraper("reddit", config, debug=True) + reddit = scraper("reddit", config, low_activity_random=True) # run bots - reddit.run(masto) - # buffer time bc posts only happen so often - time.sleep(60) + run(masto, reddit) + # buffer time bc posts only happen so often so why check + time.sleep(5) if __name__ == "__main__": main() -scrape_all \ No newline at end of file diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..ec7241c --- /dev/null +++ b/bot.py @@ -0,0 +1,25 @@ +from mastodon import Mastodon +import logging + +# Mastodon bot to post things +class bot(): + def __init__(self, config, debug=False): + self.debug = debug + self.masto = Mastodon(access_token=config["mastodon"]["access_token"], api_base_url=config["mastodon"]["host"]) + + # uploads media to mastodon, returns the mastodon ID + # specify mimetype of video files as "video/mp4" to avoid error + def upload_media(self, filename, mimetype=None): + logging.info(f"Uploading media {filename}") + return self.masto.media_post(filename, mime_type=mimetype) + + # uploads all given media + def upload_all_media(self, filenames): + ids = [] + for fn in filenames: + ids.append(self.upload_media(fn)) + return ids + + def toot(self, text, media=None): + logging.info(f"Posting:\n Text: {text}") + print("self.masto.status_post(text, media_ids=media)") diff --git a/helper.py b/helper.py new file mode 100644 index 0000000..4cab728 --- /dev/null +++ b/helper.py @@ -0,0 +1,71 @@ +import requests +import os +import logging +from datetime import datetime + +### HELPER METHODS +# helper method to clean out folder (delete all contents) +# expected structure: [["temp/a/1", "temp/a/2"], [], [], ["temp/e/1"]] +class helper(): + def __init__(service): + # copy the service's variables to make them local + # because it's easier to access and doesn't requre the + # service to pass itself in every time + service = service.service + low_activity_random = service.low_activity_random + debug = service.debug + places = service.places + seent = service.seent + + def remove_folders(folders_list): + for folder in folders_list: + logging.info(f"Clearing folder {folder}") + for file in folder: + os.remove(file) + if len(folder) > 0: + subfolder = "/".join(folder[0].split("/")[:-1]) + os.rmdir(subfolder) + + # helper method to download media + def download_media(url, filename): + # get file first + logging.info(f"Downloading {url} info {filename}") + resp = requests.get(url) + if resp.ok: + # make sure directory structure exists + structure = filename.split("/") + for i in range(len(structure[:-1])): + d = "/".join(structure[0:i+1]) + if not os.path.isdir(d): os.mkdir(d) + # write the downloaded content to file + with open(filename, "wb+") as f: + f.write(resp.content) + + # reddit helper method to return the post type + def get_post_type(post): + print(post.url) + if post.url[8] == 'i': return "image" + if post.url[8] == 'v': return "video" + if post.url[23:30] == "gallery": return "gallery" + return "unknown" + + # returns True if the ts1 is older than ts2 + # tsx should be a timestamp value + def ts_older(ts1, ts2): + # timedelta of `hours` + hours_delta = datetime.fromtimestamp(ts2) - datetime.fromtimestamp(0) + # timedelta of timestamp + stamp_delta = datetime.fromtimestamp(ts1) + stamp_delta = datetime.now() - stamp_delta + return stamp_delta > hours_delta + + # returns True if place hasn't had a post in the past 12 hours according + # to the savefile + def been_awhile(seent_time, hours=12): + long_time = 60 * 60 * hours + return helper.ts_older(int(seent_time), long_time) + + # takes in a ListingGenerator (list of reddit posts) and + # reverses it + def reddit_listify(LG): + return [p for p in LG] diff --git a/reddit.py b/reddit.py new file mode 100644 index 0000000..be56342 --- /dev/null +++ b/reddit.py @@ -0,0 +1,129 @@ +from helper import helper +import praw +import json +import time +import logging + +class reddit_scraper: + def __init__(self, config): + self.login = praw.Reddit( + client_id=config["reddit"]["client_id"], + client_secret=config["reddit"]["client_secret"], + password=config["reddit"]["password"], + user_agent=config["reddit"]["user_agent"], + username=config["reddit"]["username"]) + self.places = config["reddit"]["places"] + savefile = open("savefile.json", "r") + savefile = json.load(savefile) + try: self.seent = savefile["reddit"] + except: self.seent = {} + + + ### REDDIT METHODS + # gets posts from a given subreddit + def scrape(self, sub, limit): + # make sure self.seent has the sub, add if not + if sub not in self.seent: self.seent[sub] = time.time() + # get posts that aren't in seent list + post_list = [] + posts = self.login.subreddit(sub).new(limit=limit) + posts = helper.reddit_listify(posts) + for p in posts[::-1]: + if helper.ts_older(p.created, self.seent[sub]): + break + logging.info(f"Scraping post {p.id}") + post_list.append(p) + self.seent[sub] = p.created + return post_list + + # scrapes all subreddits + def scrape_all(self, limit): + subposts = {} + for place in self.places: + subposts[place] = self.scrape(place, limit) + return subposts + + # downloads a given post; media is stored in temp/post_id/n + # returns a list of the stored file locations for that post + def download(self, post): + def make_gallery_urls(): + nonlocal post + urls = [] + for m in post.media_metadata: + mimetype = post.media_metadata[m]["m"] + end = mimetype[mimetype.find("/")+1:] + urls.append(f"https://i.redd.it/{m}.{end}") + return urls + # video is sketchy, sorta WIP but maybe impossible + # to have consistently. this function does its best + def try_video_urls(post): + try: + raw_url = post.media["video"]["fallback_url"] + return [raw_url[:raw_url.find("?")]] + except: + try: + raw_url = post.media["reddit_video"]["fallback_url"] + return [raw_url[:raw_url.find("?")]] + except: + return [] + return [] # should never be reached but just in case + + # get the media URLs in array + urls = [] + post_type = helper.get_post_type(post) + if post_type == "image": + urls = [post.url] + elif post_type == "video": + urls = try_video_urls(post) + elif post_type == "gallery": + urls = make_gallery_urls() + + urls = [] # nueter download + + # download all media + local_urls = [] + i = 0 + for url in urls: + i += 1 + name = f"temp/{post.id}/{i}" + logging.info(f"Downloading {url} ({i}/{len(urls)})") + helper.download_media(url, name) + local_urls.append(name) + + return local_urls + + # posts if it's been a while. checks each sub and + def keep_lively(self): + for sub in self.places: + if helper.been_awhile(self.seent[sub]): + self.random_post(sub) + + # gets a random post from reddit + def random_post(self, place): + return self.login.subreddit(place).random() + + # creates the savefile for a list of posts. + def remember(self): + print(f"{self.seent}") + savefile = json.load(open("savefile.json", "r")) + savefile["reddit"] = self.seent + savefile = json.dumps(savefile) + with open("savefile.json", "w") as f: + f.write(savefile) + + ### TOOTER METHODS + # takes a toot and returns a dict of the text and media IDs + def build_toot(self, masto, post): + toot = {} + toot["text"] = post.title + if helper.get_post_type(post) == "video": toot["text"] += f"\n\n{post.url}" + local_media = self.download(post) + toot["media"] = masto.upload_all_media(local_media) + return toot + + # toots all posts in list + def toot_posts(self, masto, posts): + for post in posts: + to_toot = self.build_toot(masto, post) + masto.toot(to_toot["text"], to_toot["media"]) + return True diff --git a/scraper.py b/scraper.py new file mode 100644 index 0000000..efe4370 --- /dev/null +++ b/scraper.py @@ -0,0 +1,75 @@ +import os +import logging +import json +from reddit import reddit_scraper as reddit + +class scraper: + def __init__(self, service, config, low_activity_random=False): + # error checking + scrapers = ["reddit"] + if service.lower() not in scrapers: + logging.error(f"Scraper {service} invalid. Choose one of {', '.join(scrapers)}") + return None + # make sure necessary filestructure is in place + if not os.path.isdir("temp"): os.mkdir("temp") + if not os.path.exists("savefile.json"): + f = open("savefile.json", "w+") + f.write("{}") + # set object variables + self.low_activity_random = low_activity_random + self.service = service + # login to service + if service == "reddit": self.login = reddit(config) + + ### WRAPPER METHODS + def scrape(self, place, limit=10): + logging.info(f"Scraping {self.service}: {place}... ") + result = self.login.scrape(place, limit) + logging.info(f"Done scraping {self.service}: {place}.") + return result + + # gets posts from a gives service's places (ie, multiple subreddits or feeds) + def scrape_all(self, limit=10): + return self.login.scrape_all(limit) + + # downloads a given post's media and return the locations + def download(self, post): + logging.info(f"Downloading {post.id}... ") + result = self.login.download(post) + logging.info(f"Done downloading {post.id}.") + return result + + # downloads a list of post's media and returns a list of the locations + def download_all(self, posts): + post_ids = [p.id for p in posts] + locations = [] + for post in post_ids: + locations.append(self.login.download(post)) + return locations + + # creates the savefile for a list of posts. + def remember(self): + logging.info(f"Remembering {self.service}...") + self.login.remember() + logging.info(f"Remembered {self.service}.") + + # posts for each place if it has been a while + def keep_lively(self): + self.login.keep_lively() + + # posts a random post from the given place + def random_post(self, place): + logging.info(f"Getting random post for {place}") + return self.login.random_post(place) + + ### TOOTER METHODS + # takes a toot and returns a dict of the text and media IDs + def build_toot(self, masto, post): + return self.login.build_toot(masto, post) + + # toots all posts in list + def toot_posts(self, masto, posts): + for post in posts: + to_toot = self.build_toot(masto, post) + masto.toot(to_toot["text"], to_toot["media"]) + return True