This repository has been archived on 2021-02-17. You can view files and clone it, but cannot push or open issues or pull requests.
masto-image-bot/bot.py

284 lines
12 KiB
Python
Raw Permalink Normal View History

2018-06-19 22:21:27 +02:00
#!/usr/bin/env python3
2018-05-16 09:16:33 +02:00
# coding: utf-8
# -*- coding: utf-8 -*-
2018-05-16 16:16:23 +02:00
# A Fediverse (decentralized social network, for instance using Mastodon) bot
2018-06-18 13:04:08 +02:00
from mastodon import StreamListener
2018-05-17 11:00:10 +02:00
from lxml import html
2018-05-16 13:36:41 +02:00
from logging.handlers import RotatingFileHandler
2018-06-21 13:56:51 +02:00
from pprint import pprint
2018-06-22 22:08:06 +02:00
from random import randint
2018-06-18 13:05:56 +02:00
from utils.config import get_parameter, init_log, init_mastodon
2018-06-22 22:08:06 +02:00
from PIL import Image
from io import BytesIO
2018-05-16 13:36:41 +02:00
import requests, os, random, sys, time, json, logging, argparse, re, shutil
2018-06-22 22:08:06 +02:00
config_file = "config.txt"
secrets_filepath = get_parameter("secrets_filepath", config_file)
log_filepath = get_parameter("log_filepath", config_file)
blacklist_filepath = get_parameter("blacklist_filepath", config_file)
collection_filepath = get_parameter("collection_filepath", config_file)
log = init_log(log_filepath)
mastodon = init_mastodon(config_file, secrets_filepath)
2018-05-16 09:16:33 +02:00
2018-06-22 18:18:51 +02:00
blacklist_file = open(blacklist_filepath,'r')
BLACKLIST = json.loads(blacklist_file.read())
2018-06-22 18:18:51 +02:00
blacklist_file.close()
2018-06-22 22:08:06 +02:00
mime_dict = {'.jpg': 'image/jpeg', '.jpe': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif'}
2018-06-22 18:18:51 +02:00
2018-06-22 22:08:06 +02:00
def post_img_local(mastodon, text, log, config):
2018-06-23 00:58:57 +02:00
img_path = get_parameter("img_path", config)
2018-06-23 00:58:57 +02:00
continu = True;
while continu:
secure_random = random.SystemRandom()
file = secure_random.choice(os.listdir(img_path+"/"))
2018-06-23 00:58:57 +02:00
if os.path.isdir(img_path+file):
img_path = img_path+file+"/"
else:
if ".zip" not in file:
2018-06-23 00:58:57 +02:00
continu = False
im = Image.open(img_path+ file)
2018-06-23 00:58:57 +02:00
width, height = im.size
2018-08-18 01:16:37 +02:00
NEW_WIDTH = 2048
if width > 2048:
difference_percent = NEW_WIDTH / width
new_height = height * difference_percent
size = new_height, NEW_WIDTH
im = im.resize((int(NEW_WIDTH), int(new_height)))
im.save('resize_img.jpg')
file = "resize_img.jpg"
shutil.copyfile(file, "/tmp/"+file)
else:
log.debug("no resize")
shutil.copyfile(img_path+file, "/tmp/"+file)
2018-06-23 00:58:57 +02:00
image_byte = open("/tmp/"+file, "rb").read()
file, ext = os.path.splitext(file)
os.remove("/tmp/"+file+ext)
2018-05-16 13:36:41 +02:00
2018-06-23 00:58:57 +02:00
try:
mime = mime_dict[str.lower(ext)]
except KeyError:
mime = None;
log.error(ext + " is not present on mime_dict, please add this")
pass
media_dict = mastodon.media_post(image_byte, mime)
return media_dict;
2018-06-22 22:08:06 +02:00
def post_unsplash_random_image(mastodon, log, config):
2019-01-07 13:50:35 +01:00
collection_url = get_parameter("collection_url", config)
unsplash_client_id = get_parameter("unsplash_client_id", config)
collecion_file = open(collection_filepath,'r')
collections = json.loads(collecion_file.read())
collecion_file.close()
count_collection = len(collections)-1
if count_collection > -1:
id_collection = randint(0,count_collection)
collection_url="&collections="+str(collections[id_collection])
else:
collection_url=''
response = requests.get("https://api.unsplash.com/photos/random?client_id="+unsplash_client_id+collection_url)
randim_json = json.loads(response.text)
randim_url = "{}&q=85&crop=entropy&cs=tinysrgb&w=2048&fit=max".format(randim_json['urls']['raw'])
img_response = requests.get(randim_url)
pattern = Image.open(BytesIO(img_response.content), "r").convert('RGB')
pattern.save('output.jpg')
media_dict = mastodon.media_post("output.jpg")
toot = "Shot by {} ({})\n{}".format(randim_json['user']['name'], randim_json['user']['links']['html'], randim_json['links']['html'])
return { 'media_dict': media_dict, 'toot': toot };
2018-06-22 22:08:06 +02:00
def post_img_distant(mastodon, text, log, config):
collection_url = get_parameter("collection_url", config)
collecion_file = open(collection_filepath,'r')
collections = json.loads(collecion_file.read())
2018-06-22 22:08:06 +02:00
collecion_file.close()
2018-06-22 22:08:06 +02:00
count_collection = len(collections)-1
id_collection = randint(0,count_collection)
2018-06-18 12:03:15 +02:00
2018-06-22 22:08:06 +02:00
collection_url = collection_url.replace("<collection>", str(collections[id_collection]))
response = requests.get(collection_url)
pattern = Image.open(BytesIO(response.content), "r").convert('RGB')
2018-06-22 22:08:06 +02:00
pattern.save('output.jpg')
2018-05-16 10:04:05 +02:00
2018-06-22 22:08:06 +02:00
media_dict = mastodon.media_post("output.jpg")
2018-06-19 22:06:59 +02:00
return media_dict;
cleanr = re.compile('<.*?>')
2018-06-19 22:06:59 +02:00
def cleanhtml(raw_html):
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
2018-05-16 10:04:05 +02:00
2018-06-18 17:42:27 +02:00
class BotListener(StreamListener):
2018-06-18 21:27:50 +02:00
def __init__(self, args):
self.args = args
# use only notification
def on_notification(self, notification):
# catch only mention in notification
if notification['type'] == 'mention':
log.debug("Got a mention")
if notification["account"]["bot"] == False:
sender = notification['account']['acct'] # Get sender name
if sender in BLACKLIST:
log.info("Service refused to %s" % sender)
return
sender_hour_filename = "limiter/hour/" + sender; # Forge file for limiter
sender_minute_filename = "limiter/minute/" + sender; # Forge file for limiter
if os.path.isfile(sender_hour_filename): # Check if file exist
log.debug("Sender file exist")
statbuf = os.stat(sender_hour_filename)
last_edit = int(statbuf.st_mtime)
ts = int(time.time())
if ts - last_edit > 3599: # check if file is modified 1 hour after last edition
log.debug("file is too old")
f = open(sender_hour_filename,'w')
f.write(str(1)) # reset counter
f.close()
can_continue = True
else:
log.debug("file is young")
f = open(sender_hour_filename,'r+')
limit = int(get_parameter("limit_hour", config_file))
number_of_mention = int(f.read())
if number_of_mention < limit: # limit of mention per hour is limit_hour
log.debug("Sender have less of limit requests")
f.seek(0)
f.write(str(number_of_mention + 1))
can_continue = True
else:
log.debug("Sender have more of limit requests")
can_continue = False # if number of mention is for, user can't receive anything
f.close()
else: # File not exist, create it and initialise it
log.debug("Sender file not exist")
f = open(sender_hour_filename,"w+")
f.write(str(1))
f.close()
can_continue = True
if can_continue:
if os.path.isfile(sender_minute_filename): # Check if file exist
log.debug("Sender file exist")
statbuf = os.stat(sender_minute_filename)
last_edit = int(statbuf.st_mtime)
ts = int(time.time())
if ts - last_edit > 59: # check if file is modified 1 minute after last edition
log.debug("file is too old")
f = open(sender_minute_filename,'w')
f.write(str(1)) # reset counter
f.close()
can_continue = True
else:
log.debug("file is young")
f = open(sender_minute_filename,'r+')
limit = int(get_parameter("limit", config_file))
number_of_mention = int(f.read())
if number_of_mention < limit: # limit of mention per minute is 4
log.debug("Sender have less of limit requests")
f.seek(0)
f.write(str(number_of_mention + 1))
can_continue = True
else:
log.debug("Sender have more of limit requests")
can_continue = False # if number of mention is for, user can't receive anything
file = open(sender_hour_filename,'r+')
number_of_mention = int(file.read())
file.seek(0)
file.write(str(number_of_mention - 1))
file.close()
f.close()
else: # File not exist, create it and initialise it
log.debug("Sender file not exist")
f = open(sender_minute_filename,"w+")
f.write(str(1))
f.close()
can_continue = True
if can_continue:
id = notification['status']['id']
visibility = notification['status']['visibility']
if visibility == 'public':
visibility = 'unlisted'
mentions = notification['status']['mentions']
text = "@" + notification['status']["account"]["acct"] + " "
for mention in mentions:
if mention["acct"] != get_parameter("name_bot", config_file):
text = text + "@" + mention["acct"] + " "
if get_parameter("sensitive", config_file) == "yes":
sensitive = True
else:
sensitive = False
if self.args.source == "local":
media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file)
elif self.args.source == "distant":
media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file)
elif self.args.source == "unsplash-random":
resp = post_unsplash_random_image(mastodon, log, config_file)
text = text + "\n" + resp['toot']
media_dict = resp['media_dict']
mastodon.status_post(text, id, media_ids=[media_dict], sensitive=sensitive, visibility=visibility, spoiler_text=get_parameter("spoiler_text", config_file))
else:
log.debug("no picture send :(")
pass
else:
log.debug("Nevermind")
2018-06-19 22:06:59 +02:00
def main():
parser = argparse.ArgumentParser(description='Choose between image or streaming')
parser.add_argument("-i", "--img", action='store_true', help="post image")
parser.add_argument("-s", "--source", help="Source of image [ local | distant | unsplash-random ]")
parser.add_argument("--stream", action="store_true", help="stream user profile")
args = parser.parse_args()
if args.img:
text = get_parameter("default_text", config_file)
if args.source == "local":
media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file)
elif args.source == "distant":
media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file)
elif args.source == "unsplash-random":
resp = post_unsplash_random_image(mastodon, log, config_file)
text = resp['toot']
media_dict = resp['media_dict']
if get_parameter("sensitive", config_file) == "yes":
sensitive = True
else:
sensitive = False
mastodon.status_post(text, None, media_ids=[media_dict], sensitive=sensitive, visibility='public', spoiler_text=get_parameter("spoiler_text", config_file))
sys.exit()
elif args.stream:
stream = BotListener(args);
while True:
try:
log.info("Start listening...")
mastodon.stream_user(stream)
except Exception as error:
log.warning('General exception caught: ' + str(error))
time.sleep(0.5)
else:
print("Require an argument. Use --help to display help")
2018-05-17 11:42:16 +02:00
2018-05-16 13:36:41 +02:00
main()
2018-05-16 09:16:33 +02:00