thread-limit logic, finetuning + extended profile cache

This commit is contained in:
Arndt Brenschede 2020-02-08 18:31:22 +01:00
parent 4caea5f583
commit 42486f73f1
2 changed files with 125 additions and 48 deletions

View file

@ -13,16 +13,24 @@ import btools.expressions.BExpressionMetaData;
public final class ProfileCache public final class ProfileCache
{ {
private static BExpressionContextWay expctxWay;
private static BExpressionContextNode expctxNode;
private static File lastLookupFile; private static File lastLookupFile;
private static File lastProfileFile;
private static long lastLookupTimestamp; private static long lastLookupTimestamp;
private static long lastProfileTimestamp;
private static boolean profilesBusy; private BExpressionContextWay expctxWay;
private BExpressionContextNode expctxNode;
private File lastProfileFile;
private long lastProfileTimestamp;
private boolean profilesBusy;
private long lastUseTime;
private static ProfileCache[] apc = new ProfileCache[1];
private static boolean debug = Boolean.getBoolean( "debugProfileCache" );
public static synchronized void setSize( int size )
{
apc = new ProfileCache[size];
}
public static synchronized boolean parseProfile( RoutingContext rc ) public static synchronized boolean parseProfile( RoutingContext rc )
{ {
@ -43,20 +51,50 @@ public final class ProfileCache
rc.profileTimestamp = profileFile.lastModified() + rc.getKeyValueChecksum()<<24; rc.profileTimestamp = profileFile.lastModified() + rc.getKeyValueChecksum()<<24;
File lookupFile = new File( profileDir, "lookups.dat" ); File lookupFile = new File( profileDir, "lookups.dat" );
// invalidate cache at lookup-table update
if ( !(lookupFile.equals( lastLookupFile ) && lookupFile.lastModified() == lastLookupTimestamp ) )
{
if ( lastLookupFile != null )
{
System.out.println( "******** invalidating profile-cache after lookup-file update ******** " );
}
apc = new ProfileCache[apc.length];
lastLookupFile = lookupFile;
lastLookupTimestamp = lookupFile.lastModified();
}
ProfileCache lru = null;
int unusedSlot =-1;
// check for re-use // check for re-use
if ( expctxWay != null && expctxNode != null && !profilesBusy ) for( int i=0; i<apc.length; i++)
{ {
if ( profileFile.equals( lastProfileFile ) && lookupFile.equals( lastLookupFile ) ) ProfileCache pc = apc[i];
if ( pc != null )
{ {
if ( rc.profileTimestamp == lastProfileTimestamp if ( (!pc.profilesBusy) && profileFile.equals( pc.lastProfileFile ) )
&& lookupFile.lastModified() == lastLookupTimestamp )
{ {
rc.expctxWay = expctxWay; if ( rc.profileTimestamp == pc.lastProfileTimestamp )
rc.expctxNode = expctxNode; {
profilesBusy = true; rc.expctxWay = pc.expctxWay;
rc.expctxNode = pc.expctxNode;
rc.readGlobalConfig(); rc.readGlobalConfig();
pc.profilesBusy = true;
return true; return true;
} }
lru = pc; // name-match but timestamp-mismatch -> we overide this one
unusedSlot = -1;
break;
}
if ( lru == null || lru.lastUseTime > pc.lastUseTime )
{
lru = pc;
}
}
else if ( unusedSlot < 0 )
{
unusedSlot = i;
} }
} }
@ -78,22 +116,45 @@ public final class ProfileCache
rc.expctxWay.setAllTagsUsed(); rc.expctxWay.setAllTagsUsed();
} }
lastProfileTimestamp = rc.profileTimestamp; if ( lru == null || unusedSlot >= 0 )
lastLookupTimestamp = lookupFile.lastModified(); {
lastProfileFile = profileFile; lru = new ProfileCache();
lastLookupFile = lookupFile; if ( unusedSlot >= 0 )
expctxWay = rc.expctxWay; {
expctxNode = rc.expctxNode; apc[unusedSlot] = lru;
profilesBusy = true; if ( debug ) System.out.println( "******* adding new profile at idx=" + unusedSlot + " for " + profileFile );
}
}
if ( lru.lastProfileFile != null )
{
if ( debug ) System.out.println( "******* replacing profile of age " + ((System.currentTimeMillis()-lru.lastUseTime)/1000L) + " sec " + lru.lastProfileFile + "->" + profileFile );
}
lru.lastProfileTimestamp = rc.profileTimestamp;
lru.lastProfileFile = profileFile;
lru.expctxWay = rc.expctxWay;
lru.expctxNode = rc.expctxNode;
lru.profilesBusy = true;
lru.lastUseTime = System.currentTimeMillis();
return false; return false;
} }
public static synchronized void releaseProfile( RoutingContext rc ) public static synchronized void releaseProfile( RoutingContext rc )
{ {
// only the thread that holds the cached instance can release it for( int i=0; i<apc.length; i++)
if ( rc.expctxWay == expctxWay && rc.expctxNode == expctxNode )
{ {
profilesBusy = false; ProfileCache pc = apc[i];
if ( pc != null )
{
// only the thread that holds the cached instance can release it
if ( rc.expctxWay == pc.expctxWay && rc.expctxNode == pc.expctxNode )
{
pc.profilesBusy = false;
break;
}
}
} }
rc.expctxWay = null; rc.expctxWay = null;
rc.expctxNode = null; rc.expctxNode = null;

View file

@ -20,12 +20,13 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.PriorityQueue;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import btools.router.OsmNodeNamed; import btools.router.OsmNodeNamed;
import btools.router.OsmTrack; import btools.router.OsmTrack;
import btools.router.ProfileCache;
import btools.router.RoutingContext; import btools.router.RoutingContext;
import btools.router.RoutingEngine; import btools.router.RoutingEngine;
import btools.server.request.ProfileUploadHandler; import btools.server.request.ProfileUploadHandler;
@ -33,7 +34,7 @@ import btools.server.request.RequestHandler;
import btools.server.request.ServerHandler; import btools.server.request.ServerHandler;
import btools.util.StackSampler; import btools.util.StackSampler;
public class RouteServer extends Thread public class RouteServer extends Thread implements Comparable<RouteServer>
{ {
public static final String PROFILE_UPLOAD_URL = "/brouter/profile"; public static final String PROFILE_UPLOAD_URL = "/brouter/profile";
static final String HTTP_STATUS_OK = "200 OK"; static final String HTTP_STATUS_OK = "200 OK";
@ -47,8 +48,10 @@ public class RouteServer extends Thread
private Socket clientSocket = null; private Socket clientSocket = null;
private RoutingEngine cr = null; private RoutingEngine cr = null;
private volatile boolean terminated; private volatile boolean terminated;
private long starttime;
private static Object threadPoolSync = new Object(); private static Object threadPoolSync = new Object();
private static boolean debug = Boolean.getBoolean( "debugThreadPool" );
public void stopRouter() public void stopRouter()
{ {
@ -301,7 +304,9 @@ public class RouteServer extends Thread
int maxthreads = Integer.parseInt( args[4] ); int maxthreads = Integer.parseInt( args[4] );
TreeMap<Long,RouteServer> threadMap = new TreeMap<Long,RouteServer>(); ProfileCache.setSize( 2*maxthreads );
PriorityQueue<RouteServer> threadQueue = new PriorityQueue<RouteServer>();
ServerSocket serverSocket = args.length > 5 ? new ServerSocket(Integer.parseInt(args[3]),100,InetAddress.getByName(args[5])) : new ServerSocket(Integer.parseInt(args[3])); ServerSocket serverSocket = args.length > 5 ? new ServerSocket(Integer.parseInt(args[3]),100,InetAddress.getByName(args[5])) : new ServerSocket(Integer.parseInt(args[3]));
@ -317,43 +322,47 @@ public class RouteServer extends Thread
System.out.println( "*** sampling stacks into stacks.txt *** "); System.out.println( "*** sampling stacks into stacks.txt *** ");
} }
long last_ts = 0;
for (;;) for (;;)
{ {
Socket clientSocket = serverSocket.accept(); Socket clientSocket = serverSocket.accept();
RouteServer server = new RouteServer(); RouteServer server = new RouteServer();
server.serviceContext = serviceContext; server.serviceContext = serviceContext;
server.clientSocket = clientSocket; server.clientSocket = clientSocket;
server.starttime = System.currentTimeMillis();
// kill an old thread if thread limit reached // kill an old thread if thread limit reached
cleanupThreadList( threadMap ); cleanupThreadQueue( threadQueue );
if ( threadMap.size() >= maxthreads )
if ( debug ) System.out.println( "threadQueue.size()=" + threadQueue.size() );
if ( threadQueue.size() >= maxthreads )
{ {
synchronized( threadPoolSync ) synchronized( threadPoolSync )
{ {
// wait up to 2000ms (maybe notified earlier) // wait up to 2000ms (maybe notified earlier)
// to prevent killing short-running threads // to prevent killing short-running threads
threadPoolSync.wait( 2000 ); long maxage = server.starttime - threadQueue.peek().starttime;
} long maxWaitTime = 2000L-maxage;
cleanupThreadList( threadMap ); if ( debug ) System.out.println( "maxage=" + maxage + " maxWaitTime=" + maxWaitTime );
if ( threadMap.size() >= maxthreads ) if ( maxWaitTime > 0 )
{ {
threadPoolSync.wait( maxWaitTime );
}
}
cleanupThreadQueue( threadQueue );
if ( threadQueue.size() >= maxthreads )
{
if ( debug ) System.out.println( "stopping oldest thread..." );
// no way... stop the oldest thread // no way... stop the oldest thread
Long k = threadMap.firstKey(); threadQueue.poll().stopRouter();
RouteServer victim = threadMap.get( k );
threadMap.remove( k );
victim.stopRouter();
} }
} }
long ts = System.currentTimeMillis(); threadQueue.add( server );
while ( ts <= last_ts ) ts++;
threadMap.put( Long.valueOf( ts ), server );
last_ts = ts;
server.start(); server.start();
if ( debug ) System.out.println( "thread started..." );
} }
} }
@ -423,16 +432,16 @@ public class RouteServer extends Thread
bw.write( "\n" ); bw.write( "\n" );
} }
private static void cleanupThreadList( TreeMap<Long, RouteServer> threadMap ) private static void cleanupThreadQueue( PriorityQueue<RouteServer> threadQueue )
{ {
for ( ;; ) for ( ;; )
{ {
boolean removedItem = false; boolean removedItem = false;
for ( Map.Entry<Long, RouteServer> e : threadMap.entrySet() ) for ( RouteServer t : threadQueue )
{ {
if ( e.getValue().terminated ) if ( t.terminated )
{ {
threadMap.remove( e.getKey() ); threadQueue.remove( t );
removedItem = true; removedItem = true;
break; break;
} }
@ -443,4 +452,11 @@ public class RouteServer extends Thread
} }
} }
} }
@Override
public int compareTo( RouteServer t )
{
return starttime < t.starttime ? -1 : ( starttime > t.starttime ? 1 : 0 );
}
} }