Skip to content

Commit

Permalink
Merge branch 'main' into MAM-2677-rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
jtomchak authored Oct 25, 2023
2 parents 17db38a + 541a041 commit 975fe3a
Show file tree
Hide file tree
Showing 18 changed files with 253 additions and 29 deletions.
3 changes: 3 additions & 0 deletions app/controllers/api/v3/channels_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ def subscribe
render json: @user
end

# Trigger user rebuild when unsubscribing from channel
# this is to clear out unwanted content from FY Feed
def unsubscribe
@mammoth = Mammoth::Channels.new
@user = @mammoth.unsubscribe(channel_id_param, acct_param)
UpdateForYouWorker.perform_async({ acct: acct_param, rebuild: true })
render json: @user
end

Expand Down
3 changes: 2 additions & 1 deletion app/controllers/api/v3/timelines/for_you_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Api::V3::Timelines::ForYouController < Api::BaseController
after_action :insert_pagination_headers, only: [:show], unless: -> { @statuses.empty? }

def index
result = PersonalForYou.new.user(acct_param)
result = PersonalForYou.new.mammoth_user(acct_param)
render json: result
end

Expand Down Expand Up @@ -135,6 +135,7 @@ def acct_param
def for_you_params
params.permit(
:acct,
[enabled_channels: []],
:curated_by_mammoth,
:friends_of_friends,
:from_your_channels,
Expand Down
21 changes: 21 additions & 0 deletions app/controllers/api/v3/timelines/statuses_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

class Api::V3::Timelines::StatusesController < Api::BaseController
before_action :require_mammoth!

rescue_from Mammoth::StatusOrigin::NotFound do |e|
render json: { error: e.to_s }, status: 404
end

def show
origin = Mammoth::StatusOrigin.instance
@origins = origin.find(status_id_param)
render json: @origins, each_serializer: StatusOriginSerializer
end

private

def status_id_param
params.require(:id)
end
end
25 changes: 22 additions & 3 deletions app/lib/mammoth/channels.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,41 @@ class NotFound < StandardError; end
def channels_with_statuses
list(include_accounts: true).each do |channel|
account_ids = account_ids(channel[:accounts])
channel[:statuses] = statuses_from_channel_accounts(account_ids)
channel[:statuses] = statuses_from_channels(account_ids)
end
end

# Used in ForYou Feed
# Get Statuses from array of channels
# filter out based on per channel threshold
def select_channels_with_statuses(channels)
origin = Mammoth::StatusOrigin.instance
channels.flat_map do |channel|
account_ids = account_ids(channel[:accounts])
statuses_from_channel_accounts(account_ids)
statuses_with_accounts_from_channels(account_ids).filter_map { |s| engagment_threshold(s, channel[:fy_engagement_threshold]) }
.each { |s| origin.add_channel(s, channel) }
end
end

def statuses_from_channel_accounts(account_ids)
def statuses_from_channels(account_ids)
Status.where(account_id: account_ids,
created_at: (GO_BACK.hours.ago)..Time.current)
end

def statuses_with_accounts_from_channels(account_ids)
Status.includes([:account]).where(account_id: account_ids,
created_at: (GO_BACK.hours.ago)..Time.current)
end

# Check status for Channel's set level of engagment
# Filter out polls and replys
def engagment_threshold(wrapped_status, channel_engagment_setting)
status = wrapped_status.reblog? ? wrapped_status.reblog : wrapped_status

status_counts = status.reblogs_count + status.replies_count + status.favourites_count
status if status_counts >= channel_engagment_setting && status.in_reply_to_id.nil? && status.poll_id.nil?
end

# Returns an array of account id's
def account_ids(accounts)
usernames = accounts.pluck(:username)
Expand Down
67 changes: 67 additions & 0 deletions app/lib/mammoth/status_origin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# frozen_string_literal: true
# rubocop:disable all
require 'singleton'

module Mammoth

class StatusOrigin
include Singleton
include Redisable
class NotFound < StandardError; end


# Add Status and Reason to list
def add_channel(status, channel)
list_key = key(status[:id])
reason = channel_reason(status, channel)

add_reason(list_key, reason)
end

def add_mammoth_pick(status)
list_key = key(status[:id])
reason = mammoth_pick_reason(status)

add_reason(list_key, reason)
end

# Add reason by key id
# Expire Reason in 7 days
def add_reason(key, reason)
redis.sadd(key, reason)
redis.expire(key, 7.day.seconds)
end

def find(status_id)
list_key = key(status_id)
results = redis.smembers(list_key).map { |o|
payload = Oj.load(o, symbol_keys: true)
originating_account = Account.create(payload[:originating_account])
# StatusOrigin Active Model for serialization
::StatusOrigin.new(source: payload[:source], channel_id: payload[:channel_id], title: payload[:title], originating_account:originating_account )
}
# Throw Error if array find is empty
raise NotFound, 'status not found' unless results.length > 0
return results
end

private

# Redis key of a status
# @param [Integer] status id
# @param [Symbol] subtype
# @return [String]
def key(id, subtype = nil)
return "origin:for_you:#{id}" unless subtype
"origin:for_you:#{id}:#{subtype}"
end

def channel_reason(status, channel)
Oj.dump({source: "SmartList", channel_id: channel[:id], title: channel[:title], originating_account: status.account})
end

def mammoth_pick_reason(status)
Oj.dump({source: "MammothPick", originating_account: status.account })
end
end
end
48 changes: 44 additions & 4 deletions app/lib/personal_for_you.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ class PersonalForYou
ACCOUNT_RELAY_AUTH = "Bearer #{ENV.fetch('ACCOUNT_RELAY_KEY')}".freeze
ACCOUNT_RELAY_HOST = 'acctrelay.moth.social'

FEATURE_HOST = 'feature.moth.social'

# Cache Key for User
def key(acct)
"mammoth:user:#{acct}"
Expand Down Expand Up @@ -48,6 +50,22 @@ def acct_relay_users
results unless response.code != 200
end

# Aggregate mammoth user from AcctRelay with Feature Api
# If the for_you setting is personal return early
# If the for_you setting is public, get waitlist feature
# and check for enrollment.
# for_you_setting type can be 'public' | 'personal' | 'waitlist'
def mammoth_user(acct)
user = user(acct)
return user unless user[:for_you_settings][:type] == 'public'
# if for_you is public get the waitlist
waitlist = waitlist_status(acct)
if waitlist == 'enrolled'
user[:for_you_settings][:type] = 'waitlist'
end
user
end

# Get Mammoth user details
# Includes any settings/preferences/configurations for feeds
# Not caching user. If it becomes an issue cache it at the source. AcctRelay
Expand All @@ -58,6 +76,15 @@ def user(acct)
JSON.parse(response.body, symbolize_names: true)
end

# Get User Waitlist Status
# :waitlist will be 'none' | 'enrolled'
def waitlist_status(acct)
response = HTTP.headers({ Authorization: ACCOUNT_RELAY_AUTH, 'Content-Type': 'application/json' }).get(
"https://#{FEATURE_HOST}/api/v1/personalize?acct=#{acct}"
)
JSON.parse(response.body, symbolize_names: true)[:waitlist]
end

# Defined as a 'personalize' user on AccountRelay
# A Mammoth user will have thier foryou settings type listed as 'personal' once it's generated
# The default foryou settings type is 'public
Expand Down Expand Up @@ -109,13 +136,26 @@ def statuses_for_direct_follows(acct)
Status.where(account_id: account_ids, updated_at: 12.hours.ago..Time.current).limit(200)
end

# Get subscribed channels with full accounts
# Get enabled channels with full accounts
# Fetch statuses for those accounts
def statuses_for_subscribed_channels(user)
def statuses_for_enabled_channels(user)
channels = Mammoth::Channels.new
subscribed_channels = subscribed_channels(user)
enabled_channels = enabled_channels(user)

channels.select_channels_with_statuses(subscribed_channels)
channels.select_channels_with_statuses(enabled_channels)
end

# Only include channels from user has enabled
# Return channels with full account details array
# User's subscribed array from `/me` only has channel summary
def enabled_channels(user)
channels = mammoth_channels
for_you_settings = user[:for_you_settings]
enabled_channel_ids = for_you_settings[:enabled_channels]

channels.filter do |channel|
enabled_channel_ids.include?(channel[:id])
end
end

# Only include channels from user subscribed
Expand Down
11 changes: 11 additions & 0 deletions app/models/status_origin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ActiveModel Only for Serialization
class StatusOrigin
include ActiveModel::Model
include ActiveModel::Serialization

attr_accessor :source, :channel_id, :title, :originating_account

def initialize(attributes = {})
super
end
end
7 changes: 7 additions & 0 deletions app/serializers/status_origin_serializer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Required Source & Originating Account
# Channel_id & Title maybe be null
class StatusOriginSerializer < ActiveModel::Serializer
attributes :source, :title, :channel_id

belongs_to :originating_account, serializer: REST::AccountSerializer
end
2 changes: 1 addition & 1 deletion app/workers/for_you_feed_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class ForYouFeedWorker
include Redisable
include Sidekiq::Worker

sidekiq_options queue: 'pull', retry: 0
sidekiq_options queue: 'mammoth', retry: 0

MAX_ITEMS = 1000
MINIMUM_ENGAGMENT_ACTIONS = 2
Expand Down
14 changes: 1 addition & 13 deletions app/workers/scheduler/channel_mammoth_statuses_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@ def update_channel_feeds!

# Filter statuses based on engagment and push to feed.
def push_statuses(statuses, channel_id)
statuses.filter_map { |status| engagment_threshold(status) }
.each { |status| ChannelFeedWorker.perform_async(status['id'], channel_id) }
end

# Check status for Channel level of engagment
# Filter out polls and replys
def engagment_threshold(wrapped_status)
# enagagment threshold
engagment = MINIMUM_ENGAGMENT_ACTIONS
status = wrapped_status.reblog? ? wrapped_status.reblog : wrapped_status

status_counts = status.reblogs_count + status.replies_count + status.favourites_count
status if status_counts >= engagment && status.in_reply_to_id.nil? && status.poll_id.nil?
statuses.each { |status| ChannelFeedWorker.perform_async(status['id'], channel_id) }
end
end
2 changes: 1 addition & 1 deletion app/workers/scheduler/for_you_statuses_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Scheduler::ForYouStatusesScheduler
include Sidekiq::Worker
include JsonLdHelper

sidekiq_options retry: 0
sidekiq_options retry: 0, queue: 'mammoth'

def perform
owner_account = Account.local.where(username: FOR_YOU_OWNER_ACCOUNT)
Expand Down
13 changes: 8 additions & 5 deletions app/workers/update_for_you_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class UpdateForYouWorker
include Redisable
include Sidekiq::Worker

sidekiq_options retry: 0, queue: 'pull'
sidekiq_options retry: 0, queue: 'mammoth'

# Mammoth Curated List(OG List)

Expand Down Expand Up @@ -89,13 +89,12 @@ def push_indirect_following_status
end

# Channels Subscribed
# Include ONLY enabled_channels
def push_channels_status
user_setting = @user[:for_you_settings]
return if user_setting[:from_your_channels].zero?

@personal.statuses_for_subscribed_channels(@user)
.filter_map { |s| engagment_threshold(s, user_setting[:from_your_channels], 'channel') }
.each { |s| ForYouFeedWorker.perform_async(s['id'], @account.id, 'personal') }
@personal.statuses_for_enabled_channels(@user).each { |s| ForYouFeedWorker.perform_async(s['id'], @account.id, 'personal') }
end

# Mammoth Curated OG List
Expand All @@ -105,9 +104,13 @@ def push_mammoth_curated_status

curated_list = Mammoth::CuratedList.new
list_statuses = curated_list.curated_list_statuses
origin = Mammoth::StatusOrigin.instance

list_statuses.filter_map { |s| engagment_threshold(s, user_setting[:curated_by_mammoth], 'mammoth') }
.each { |s| ForYouFeedWorker.perform_async(s['id'], @account.id, 'personal') }
.each do |s|
origin.add_mammoth_pick(s)
ForYouFeedWorker.perform_async(s['id'], @account.id, 'personal')
end
end

# Check status for User's level of engagment
Expand Down
1 change: 1 addition & 0 deletions config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@

get '/admin', to: redirect('/admin/dashboard', status: 302)


draw(:api)

web_app_paths.each do |path|
Expand Down
1 change: 1 addition & 0 deletions config/sidekiq.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [mailers, 2]
- [pull]
- [scheduler]
- [mammoth]
:scheduler:
:listened_queues_only: true
:schedule:
Expand Down
Loading

0 comments on commit 975fe3a

Please sign in to comment.