This repository has been archived on 2021-02-17. You can view files and clone it, but cannot push or open issues or pull requests.
masto-image-bot/bot.py

284 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf-8
# -*- coding: utf-8 -*-
# A Fediverse (decentralized social network, for instance using Mastodon) bot
from mastodon import StreamListener
from lxml import html
from logging.handlers import RotatingFileHandler
from pprint import pprint
from random import randint
from utils.config import get_parameter, init_log, init_mastodon
from PIL import Image
from io import BytesIO
import requests, os, random, sys, time, json, logging, argparse, re, shutil
config_file = "config.txt"
secrets_filepath = get_parameter("secrets_filepath", config_file)
log_filepath = get_parameter("log_filepath", config_file)
blacklist_filepath = get_parameter("blacklist_filepath", config_file)
collection_filepath = get_parameter("collection_filepath", config_file)
log = init_log(log_filepath)
mastodon = init_mastodon(config_file, secrets_filepath)
blacklist_file = open(blacklist_filepath,'r')
BLACKLIST = json.loads(blacklist_file.read())
blacklist_file.close()
mime_dict = {'.jpg': 'image/jpeg', '.jpe': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif'}
def post_img_local(mastodon, text, log, config):
img_path = get_parameter("img_path", config)
continu = True;
while continu:
secure_random = random.SystemRandom()
file = secure_random.choice(os.listdir(img_path+"/"))
if os.path.isdir(img_path+file):
img_path = img_path+file+"/"
else:
if ".zip" not in file:
continu = False
im = Image.open(img_path+ file)
width, height = im.size
NEW_WIDTH = 2048
if width > 2048:
difference_percent = NEW_WIDTH / width
new_height = height * difference_percent
size = new_height, NEW_WIDTH
im = im.resize((int(NEW_WIDTH), int(new_height)))
im.save('resize_img.jpg')
file = "resize_img.jpg"
shutil.copyfile(file, "/tmp/"+file)
else:
log.debug("no resize")
shutil.copyfile(img_path+file, "/tmp/"+file)
image_byte = open("/tmp/"+file, "rb").read()
file, ext = os.path.splitext(file)
os.remove("/tmp/"+file+ext)
try:
mime = mime_dict[str.lower(ext)]
except KeyError:
mime = None;
log.error(ext + " is not present on mime_dict, please add this")
pass
media_dict = mastodon.media_post(image_byte, mime)
return media_dict;
def post_unsplash_random_image(mastodon, log, config):
collection_url = get_parameter("collection_url", config)
unsplash_client_id = get_parameter("unsplash_client_id", config)
collecion_file = open(collection_filepath,'r')
collections = json.loads(collecion_file.read())
collecion_file.close()
count_collection = len(collections)-1
if count_collection > -1:
id_collection = randint(0,count_collection)
collection_url="&collections="+str(collections[id_collection])
else:
collection_url=''
response = requests.get("https://api.unsplash.com/photos/random?client_id="+unsplash_client_id+collection_url)
randim_json = json.loads(response.text)
randim_url = "{}&q=85&crop=entropy&cs=tinysrgb&w=2048&fit=max".format(randim_json['urls']['raw'])
img_response = requests.get(randim_url)
pattern = Image.open(BytesIO(img_response.content), "r").convert('RGB')
pattern.save('output.jpg')
media_dict = mastodon.media_post("output.jpg")
toot = "Shot by {} ({})\n{}".format(randim_json['user']['name'], randim_json['user']['links']['html'], randim_json['links']['html'])
return { 'media_dict': media_dict, 'toot': toot };
def post_img_distant(mastodon, text, log, config):
collection_url = get_parameter("collection_url", config)
collecion_file = open(collection_filepath,'r')
collections = json.loads(collecion_file.read())
collecion_file.close()
count_collection = len(collections)-1
id_collection = randint(0,count_collection)
collection_url = collection_url.replace("<collection>", str(collections[id_collection]))
response = requests.get(collection_url)
pattern = Image.open(BytesIO(response.content), "r").convert('RGB')
pattern.save('output.jpg')
media_dict = mastodon.media_post("output.jpg")
return media_dict;
cleanr = re.compile('<.*?>')
def cleanhtml(raw_html):
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
class BotListener(StreamListener):
def __init__(self, args):
self.args = args
# use only notification
def on_notification(self, notification):
# catch only mention in notification
if notification['type'] == 'mention':
log.debug("Got a mention")
if notification["account"]["bot"] == False:
sender = notification['account']['acct'] # Get sender name
if sender in BLACKLIST:
log.info("Service refused to %s" % sender)
return
sender_hour_filename = "limiter/hour/" + sender; # Forge file for limiter
sender_minute_filename = "limiter/minute/" + sender; # Forge file for limiter
if os.path.isfile(sender_hour_filename): # Check if file exist
log.debug("Sender file exist")
statbuf = os.stat(sender_hour_filename)
last_edit = int(statbuf.st_mtime)
ts = int(time.time())
if ts - last_edit > 3599: # check if file is modified 1 hour after last edition
log.debug("file is too old")
f = open(sender_hour_filename,'w')
f.write(str(1)) # reset counter
f.close()
can_continue = True
else:
log.debug("file is young")
f = open(sender_hour_filename,'r+')
limit = int(get_parameter("limit_hour", config_file))
number_of_mention = int(f.read())
if number_of_mention < limit: # limit of mention per hour is limit_hour
log.debug("Sender have less of limit requests")
f.seek(0)
f.write(str(number_of_mention + 1))
can_continue = True
else:
log.debug("Sender have more of limit requests")
can_continue = False # if number of mention is for, user can't receive anything
f.close()
else: # File not exist, create it and initialise it
log.debug("Sender file not exist")
f = open(sender_hour_filename,"w+")
f.write(str(1))
f.close()
can_continue = True
if can_continue:
if os.path.isfile(sender_minute_filename): # Check if file exist
log.debug("Sender file exist")
statbuf = os.stat(sender_minute_filename)
last_edit = int(statbuf.st_mtime)
ts = int(time.time())
if ts - last_edit > 59: # check if file is modified 1 minute after last edition
log.debug("file is too old")
f = open(sender_minute_filename,'w')
f.write(str(1)) # reset counter
f.close()
can_continue = True
else:
log.debug("file is young")
f = open(sender_minute_filename,'r+')
limit = int(get_parameter("limit", config_file))
number_of_mention = int(f.read())
if number_of_mention < limit: # limit of mention per minute is 4
log.debug("Sender have less of limit requests")
f.seek(0)
f.write(str(number_of_mention + 1))
can_continue = True
else:
log.debug("Sender have more of limit requests")
can_continue = False # if number of mention is for, user can't receive anything
file = open(sender_hour_filename,'r+')
number_of_mention = int(file.read())
file.seek(0)
file.write(str(number_of_mention - 1))
file.close()
f.close()
else: # File not exist, create it and initialise it
log.debug("Sender file not exist")
f = open(sender_minute_filename,"w+")
f.write(str(1))
f.close()
can_continue = True
if can_continue:
id = notification['status']['id']
visibility = notification['status']['visibility']
if visibility == 'public':
visibility = 'unlisted'
mentions = notification['status']['mentions']
text = "@" + notification['status']["account"]["acct"] + " "
for mention in mentions:
if mention["acct"] != get_parameter("name_bot", config_file):
text = text + "@" + mention["acct"] + " "
if get_parameter("sensitive", config_file) == "yes":
sensitive = True
else:
sensitive = False
if self.args.source == "local":
media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file)
elif self.args.source == "distant":
media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file)
elif self.args.source == "unsplash-random":
resp = post_unsplash_random_image(mastodon, log, config_file)
text = text + "\n" + resp['toot']
media_dict = resp['media_dict']
mastodon.status_post(text, id, media_ids=[media_dict], sensitive=sensitive, visibility=visibility, spoiler_text=get_parameter("spoiler_text", config_file))
else:
log.debug("no picture send :(")
pass
else:
log.debug("Nevermind")
def main():
parser = argparse.ArgumentParser(description='Choose between image or streaming')
parser.add_argument("-i", "--img", action='store_true', help="post image")
parser.add_argument("-s", "--source", help="Source of image [ local | distant | unsplash-random ]")
parser.add_argument("--stream", action="store_true", help="stream user profile")
args = parser.parse_args()
if args.img:
text = get_parameter("default_text", config_file)
if args.source == "local":
media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file)
elif args.source == "distant":
media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file)
elif args.source == "unsplash-random":
resp = post_unsplash_random_image(mastodon, log, config_file)
text = resp['toot']
media_dict = resp['media_dict']
if get_parameter("sensitive", config_file) == "yes":
sensitive = True
else:
sensitive = False
mastodon.status_post(text, None, media_ids=[media_dict], sensitive=sensitive, visibility='public', spoiler_text=get_parameter("spoiler_text", config_file))
sys.exit()
elif args.stream:
stream = BotListener(args);
while True:
try:
log.info("Start listening...")
mastodon.stream_user(stream)
except Exception as error:
log.warning('General exception caught: ' + str(error))
time.sleep(0.5)
else:
print("Require an argument. Use --help to display help")
main()