#!/usr/bin/env python3 # coding: utf-8 # -*- coding: utf-8 -*- # A Fediverse (decentralized social network, for instance using Mastodon) bot from mastodon import StreamListener from lxml import html from logging.handlers import RotatingFileHandler from pprint import pprint from random import randint from utils.config import get_parameter, init_log, init_mastodon from PIL import Image from io import BytesIO import requests, os, random, sys, time, json, logging, argparse, re, shutil config_file = "config.txt" secrets_filepath = get_parameter("secrets_filepath", config_file) log_filepath = get_parameter("log_filepath", config_file) blacklist_filepath = get_parameter("blacklist_filepath", config_file) collection_filepath = get_parameter("collection_filepath", config_file) log = init_log(log_filepath) mastodon = init_mastodon(config_file, secrets_filepath) blacklist_file = open(blacklist_filepath,'r') BLACKLIST = json.loads(blacklist_file.read()) blacklist_file.close() mime_dict = {'.jpg': 'image/jpeg', '.jpe': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif'} def post_img_local(mastodon, text, log, config): img_path = get_parameter("img_path", config) continu = True; while continu: secure_random = random.SystemRandom() file = secure_random.choice(os.listdir(img_path+"/")) if os.path.isdir(img_path+file): img_path = img_path+file+"/" else: if ".zip" not in file: continu = False im = Image.open(img_path+ file) width, height = im.size NEW_WIDTH = 2048 if width > 2048: difference_percent = NEW_WIDTH / width new_height = height * difference_percent size = new_height, NEW_WIDTH im = im.resize((int(NEW_WIDTH), int(new_height))) im.save('resize_img.jpg') file = "resize_img.jpg" shutil.copyfile(file, "/tmp/"+file) else: log.debug("no resize") shutil.copyfile(img_path+file, "/tmp/"+file) image_byte = open("/tmp/"+file, "rb").read() file, ext = os.path.splitext(file) os.remove("/tmp/"+file+ext) try: mime = mime_dict[str.lower(ext)] except KeyError: mime = None; log.error(ext + " is not present on mime_dict, please add this") pass media_dict = mastodon.media_post(image_byte, mime) return media_dict; def post_unsplash_random_image(mastodon, log, config): collection_url = get_parameter("collection_url", config) unsplash_client_id = get_parameter("unsplash_client_id", config) collecion_file = open(collection_filepath,'r') collections = json.loads(collecion_file.read()) collecion_file.close() count_collection = len(collections)-1 if count_collection > -1: id_collection = randint(0,count_collection) collection_url="&collections="+str(collections[id_collection]) else: collection_url='' response = requests.get("https://api.unsplash.com/photos/random?client_id="+unsplash_client_id+collection_url) randim_json = json.loads(response.text) randim_url = "{}&q=85&crop=entropy&cs=tinysrgb&w=2048&fit=max".format(randim_json['urls']['raw']) img_response = requests.get(randim_url) pattern = Image.open(BytesIO(img_response.content), "r").convert('RGB') pattern.save('output.jpg') media_dict = mastodon.media_post("output.jpg") toot = "Shot by {} ({})\n{}".format(randim_json['user']['name'], randim_json['user']['links']['html'], randim_json['links']['html']) return { 'media_dict': media_dict, 'toot': toot }; def post_img_distant(mastodon, text, log, config): collection_url = get_parameter("collection_url", config) collecion_file = open(collection_filepath,'r') collections = json.loads(collecion_file.read()) collecion_file.close() count_collection = len(collections)-1 id_collection = randint(0,count_collection) collection_url = collection_url.replace("", str(collections[id_collection])) response = requests.get(collection_url) pattern = Image.open(BytesIO(response.content), "r").convert('RGB') pattern.save('output.jpg') media_dict = mastodon.media_post("output.jpg") return media_dict; cleanr = re.compile('<.*?>') def cleanhtml(raw_html): cleantext = re.sub(cleanr, '', raw_html) return cleantext class BotListener(StreamListener): def __init__(self, args): self.args = args # use only notification def on_notification(self, notification): # catch only mention in notification if notification['type'] == 'mention': log.debug("Got a mention") if notification["account"]["bot"] == False: sender = notification['account']['acct'] # Get sender name if sender in BLACKLIST: log.info("Service refused to %s" % sender) return sender_hour_filename = "limiter/hour/" + sender; # Forge file for limiter sender_minute_filename = "limiter/minute/" + sender; # Forge file for limiter if os.path.isfile(sender_hour_filename): # Check if file exist log.debug("Sender file exist") statbuf = os.stat(sender_hour_filename) last_edit = int(statbuf.st_mtime) ts = int(time.time()) if ts - last_edit > 3599: # check if file is modified 1 hour after last edition log.debug("file is too old") f = open(sender_hour_filename,'w') f.write(str(1)) # reset counter f.close() can_continue = True else: log.debug("file is young") f = open(sender_hour_filename,'r+') limit = int(get_parameter("limit_hour", config_file)) number_of_mention = int(f.read()) if number_of_mention < limit: # limit of mention per hour is limit_hour log.debug("Sender have less of limit requests") f.seek(0) f.write(str(number_of_mention + 1)) can_continue = True else: log.debug("Sender have more of limit requests") can_continue = False # if number of mention is for, user can't receive anything f.close() else: # File not exist, create it and initialise it log.debug("Sender file not exist") f = open(sender_hour_filename,"w+") f.write(str(1)) f.close() can_continue = True if can_continue: if os.path.isfile(sender_minute_filename): # Check if file exist log.debug("Sender file exist") statbuf = os.stat(sender_minute_filename) last_edit = int(statbuf.st_mtime) ts = int(time.time()) if ts - last_edit > 59: # check if file is modified 1 minute after last edition log.debug("file is too old") f = open(sender_minute_filename,'w') f.write(str(1)) # reset counter f.close() can_continue = True else: log.debug("file is young") f = open(sender_minute_filename,'r+') limit = int(get_parameter("limit", config_file)) number_of_mention = int(f.read()) if number_of_mention < limit: # limit of mention per minute is 4 log.debug("Sender have less of limit requests") f.seek(0) f.write(str(number_of_mention + 1)) can_continue = True else: log.debug("Sender have more of limit requests") can_continue = False # if number of mention is for, user can't receive anything file = open(sender_hour_filename,'r+') number_of_mention = int(file.read()) file.seek(0) file.write(str(number_of_mention - 1)) file.close() f.close() else: # File not exist, create it and initialise it log.debug("Sender file not exist") f = open(sender_minute_filename,"w+") f.write(str(1)) f.close() can_continue = True if can_continue: id = notification['status']['id'] visibility = notification['status']['visibility'] if visibility == 'public': visibility = 'unlisted' mentions = notification['status']['mentions'] text = "@" + notification['status']["account"]["acct"] + " " for mention in mentions: if mention["acct"] != get_parameter("name_bot", config_file): text = text + "@" + mention["acct"] + " " if get_parameter("sensitive", config_file) == "yes": sensitive = True else: sensitive = False if self.args.source == "local": media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file) elif self.args.source == "distant": media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file) elif self.args.source == "unsplash-random": resp = post_unsplash_random_image(mastodon, log, config_file) text = text + "\n" + resp['toot'] media_dict = resp['media_dict'] mastodon.status_post(text, id, media_ids=[media_dict], sensitive=sensitive, visibility=visibility, spoiler_text=get_parameter("spoiler_text", config_file)) else: log.debug("no picture send :(") pass else: log.debug("Nevermind") def main(): parser = argparse.ArgumentParser(description='Choose between image or streaming') parser.add_argument("-i", "--img", action='store_true', help="post image") parser.add_argument("-s", "--source", help="Source of image [ local | distant | unsplash-random ]") parser.add_argument("--stream", action="store_true", help="stream user profile") args = parser.parse_args() if args.img: text = get_parameter("default_text", config_file) if args.source == "local": media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file) elif args.source == "distant": media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file) elif args.source == "unsplash-random": resp = post_unsplash_random_image(mastodon, log, config_file) text = resp['toot'] media_dict = resp['media_dict'] if get_parameter("sensitive", config_file) == "yes": sensitive = True else: sensitive = False mastodon.status_post(text, None, media_ids=[media_dict], sensitive=sensitive, visibility='public', spoiler_text=get_parameter("spoiler_text", config_file)) sys.exit() elif args.stream: stream = BotListener(args); while True: try: log.info("Start listening...") mastodon.stream_user(stream) except Exception as error: log.warning('General exception caught: ' + str(error)) time.sleep(0.5) else: print("Require an argument. Use --help to display help") main()